# Chapter 12: Automating Data Pipelines with Apache Airflow

This notebook accompanies Chapter 12 of the O'Reilly AutoML book. It demonstrates practical patterns for building data pipelines that support AutoML workflows, including:

1. Data validation and quality gates
2. Point-in-time correct feature engineering
3. Incremental processing patterns
4. Data and concept drift detection
5. Data contracts and schema validation

**Note:** Some code demonstrates Airflow DAG patterns that would run in an Airflow environment. These are provided for reference and can be adapted to your deployment.

**Prerequisites:**
- Python 3.10+
- pandas, numpy, scipy
- scikit-learn
- (Optional) apache-airflow for full DAG execution

---
## Section 12.1: Environment Setup

In [None]:
# Install required packages (uncomment if needed)
# !pip install pandas numpy scipy scikit-learn

In [None]:
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from scipy import stats
import json
from dataclasses import dataclass
from typing import List, Dict, Any, Optional

print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")
print("Environment ready for pipeline demonstrations")

---
## Section 12.2: Sample Data Generation

We'll create synthetic customer and transaction data to demonstrate pipeline patterns.

In [None]:
# Generate sample customer data
np.random.seed(42)
n_customers = 1000

customers_df = pd.DataFrame({
    'customer_id': [f'CUST_{i:05d}' for i in range(n_customers)],
    'registration_date': pd.date_range('2020-01-01', periods=n_customers, freq='2D'),
    'customer_segment': np.random.choice(['premium', 'standard', 'basic'], n_customers, p=[0.1, 0.6, 0.3]),
    'lifetime_value': np.random.exponential(500, n_customers),
    'last_login_date': pd.Timestamp.now() - pd.to_timedelta(np.random.randint(0, 90, n_customers), unit='D'),
    'email_verified': np.random.choice([True, False], n_customers, p=[0.9, 0.1]),
    'updated_at': pd.Timestamp.now() - pd.to_timedelta(np.random.randint(0, 30, n_customers), unit='D')
})

print(f"Created {len(customers_df)} customer records")
customers_df.head()

In [None]:
# Generate sample transaction data
n_transactions = 10000

transactions_df = pd.DataFrame({
    'transaction_id': [f'TXN_{i:08d}' for i in range(n_transactions)],
    'customer_id': np.random.choice(customers_df['customer_id'], n_transactions),
    'transaction_timestamp': pd.date_range('2024-01-01', periods=n_transactions, freq='30min'),
    'amount': np.random.lognormal(3, 1, n_transactions),
    'product_category': np.random.choice(['electronics', 'clothing', 'food', 'services'], n_transactions)
})

print(f"Created {len(transactions_df)} transaction records")
transactions_df.head()

---
## Section 12.3: Point-in-Time Correct Feature Engineering

One of the most critical aspects of ML data pipelines is ensuring **point-in-time correctness** - features should only use data that would have been available at prediction time.

### Snippet 12-1: Wrong vs Right Feature Engineering

In [None]:
# Snippet 12-1: Point-in-Time Correct Feature Engineering

def create_features_WRONG(transactions_df, prediction_date):
    """
    BAD EXAMPLE: Uses ALL transactions, including FUTURE data.
    This causes data leakage!
    """
    # BUG: No date filtering - includes future transactions
    features = transactions_df.groupby('customer_id').agg({
        'amount': ['sum', 'mean', 'count'],
        'transaction_timestamp': 'max'
    })
    features.columns = ['total_spend', 'avg_spend', 'txn_count', 'last_txn']
    return features.reset_index()


def create_features_CORRECT(transactions_df, prediction_date, lookback_days=30):
    """
    GOOD EXAMPLE: Only uses data BEFORE the prediction date.
    This ensures point-in-time correctness.
    """
    # Filter to only historical transactions
    cutoff_date = prediction_date
    lookback_start = prediction_date - timedelta(days=lookback_days)
    
    historical_txns = transactions_df[
        (transactions_df['transaction_timestamp'] >= lookback_start) &
        (transactions_df['transaction_timestamp'] < cutoff_date)  # Strict < not <=
    ]
    
    features = historical_txns.groupby('customer_id').agg({
        'amount': ['sum', 'mean', 'count'],
        'transaction_timestamp': 'max'
    })
    features.columns = ['total_spend_30d', 'avg_spend_30d', 'txn_count_30d', 'last_txn_date']
    
    # Add metadata for auditability
    features = features.reset_index()
    features['feature_as_of_date'] = prediction_date
    features['lookback_days'] = lookback_days
    
    return features


# Demonstrate the difference
prediction_date = pd.Timestamp('2024-06-15')

wrong_features = create_features_WRONG(transactions_df, prediction_date)
correct_features = create_features_CORRECT(transactions_df, prediction_date)

print(f"WRONG approach: {len(wrong_features)} customers, uses ALL {len(transactions_df)} transactions")
print(f"CORRECT approach: {len(correct_features)} customers, uses only historical transactions")
print(f"\nCorrect features sample:")
correct_features.head()

---
## Section 12.4: Data Quality Validation

Production pipelines need comprehensive validation to catch data quality issues before they corrupt downstream processing.

### Snippet 12-2: Hierarchical Data Validation

In [None]:
# Snippet 12-2: Hierarchical Data Quality Validation

class DataValidator:
    """Comprehensive data quality validation with hierarchical checks."""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.results = {
            'passed': [],
            'failed': [],
            'warnings': []
        }
    
    def check_schema(self, required_columns: List[str]) -> bool:
        """Level 1: Quick schema validation (fail fast)."""
        missing = set(required_columns) - set(self.df.columns)
        if missing:
            self.results['failed'].append(f"Missing columns: {missing}")
            return False
        self.results['passed'].append("Schema validation passed")
        return True
    
    def check_nulls(self, critical_columns: List[str], max_null_rate: float = 0.05) -> bool:
        """Level 2: Null rate validation."""
        all_passed = True
        for col in critical_columns:
            if col in self.df.columns:
                null_rate = self.df[col].isnull().mean()
                if null_rate > max_null_rate:
                    self.results['failed'].append(
                        f"Column '{col}' null rate {null_rate:.2%} > {max_null_rate:.2%}"
                    )
                    all_passed = False
        if all_passed:
            self.results['passed'].append("Null validation passed")
        return all_passed
    
    def check_duplicates(self, key_columns: List[str]) -> bool:
        """Level 3: Duplicate detection."""
        duplicates = self.df.duplicated(subset=key_columns).sum()
        if duplicates > 0:
            self.results['failed'].append(f"Found {duplicates} duplicate records")
            return False
        self.results['passed'].append("Duplicate check passed")
        return True
    
    def check_value_ranges(self, column: str, min_val: float = None, max_val: float = None) -> bool:
        """Level 4: Value range validation."""
        if column not in self.df.columns:
            return True
        
        if min_val is not None:
            violations = (self.df[column] < min_val).sum()
            if violations > 0:
                self.results['warnings'].append(
                    f"{violations} records with {column} < {min_val}"
                )
        
        if max_val is not None:
            violations = (self.df[column] > max_val).sum()
            if violations > 0:
                self.results['warnings'].append(
                    f"{violations} records with {column} > {max_val}"
                )
        
        return True
    
    def check_volume(self, min_records: int) -> bool:
        """Level 5: Volume validation."""
        if len(self.df) < min_records:
            self.results['failed'].append(
                f"Record count {len(self.df)} < minimum {min_records}"
            )
            return False
        self.results['passed'].append(f"Volume check passed ({len(self.df)} records)")
        return True
    
    def validate_all(self, config: Dict[str, Any]) -> Dict:
        """Run all validations based on config."""
        # Run checks in order of cost (cheap first)
        schema_ok = self.check_schema(config.get('required_columns', []))
        if not schema_ok:
            return {'success': False, 'results': self.results}
        
        self.check_volume(config.get('min_records', 1))
        self.check_duplicates(config.get('key_columns', []))
        self.check_nulls(config.get('critical_columns', []))
        
        for range_check in config.get('value_ranges', []):
            self.check_value_ranges(**range_check)
        
        return {
            'success': len(self.results['failed']) == 0,
            'results': self.results
        }


# Run validation on customer data
validation_config = {
    'required_columns': ['customer_id', 'customer_segment', 'lifetime_value'],
    'key_columns': ['customer_id'],
    'critical_columns': ['customer_id', 'customer_segment'],
    'min_records': 100,
    'value_ranges': [
        {'column': 'lifetime_value', 'min_val': 0},
    ]
}

validator = DataValidator(customers_df)
validation_result = validator.validate_all(validation_config)

print("Validation Report:")
print(f"Success: {validation_result['success']}")
print(f"\nPassed: {validation_result['results']['passed']}")
print(f"Failed: {validation_result['results']['failed']}")
print(f"Warnings: {validation_result['results']['warnings']}")

---
## Section 12.5: Data Contracts

Data contracts provide a systematic approach to handling schema evolution and ensuring data quality at system boundaries.

### Snippet 12-3: Data Contract Definition and Validation

In [None]:
# Snippet 12-3: Data Contracts

@dataclass
class FieldDefinition:
    """Definition of a single field in a data contract."""
    name: str
    dtype: str  # 'string', 'int', 'float', 'datetime', 'bool'
    required: bool = True
    nullable: bool = False
    allowed_values: Optional[List] = None
    min_value: Optional[float] = None
    max_value: Optional[float] = None


@dataclass
class DataContract:
    """Data contract specification."""
    name: str
    version: str
    fields: List[FieldDefinition]
    primary_key: List[str]
    
    def validate(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Validate a DataFrame against this contract."""
        violations = []
        
        # Check required fields
        for field in self.fields:
            if field.required and field.name not in df.columns:
                violations.append(f"Missing required field: {field.name}")
                continue
            
            if field.name not in df.columns:
                continue
                
            col = df[field.name]
            
            # Check nullability
            if not field.nullable and col.isnull().any():
                null_count = col.isnull().sum()
                violations.append(f"Field '{field.name}' has {null_count} null values")
            
            # Check allowed values
            if field.allowed_values is not None:
                invalid = ~col.dropna().isin(field.allowed_values)
                if invalid.any():
                    invalid_values = col.dropna()[invalid].unique()[:5]
                    violations.append(
                        f"Field '{field.name}' has invalid values: {invalid_values}"
                    )
            
            # Check numeric ranges
            if field.min_value is not None:
                below_min = (col < field.min_value).sum()
                if below_min > 0:
                    violations.append(
                        f"Field '{field.name}' has {below_min} values below {field.min_value}"
                    )
        
        # Check primary key uniqueness
        if all(pk in df.columns for pk in self.primary_key):
            duplicates = df.duplicated(subset=self.primary_key).sum()
            if duplicates > 0:
                violations.append(f"Primary key has {duplicates} duplicates")
        
        return {
            'contract': f"{self.name} v{self.version}",
            'valid': len(violations) == 0,
            'violations': violations
        }


# Define customer data contract
CUSTOMER_CONTRACT = DataContract(
    name="customer_data",
    version="1.0",
    fields=[
        FieldDefinition('customer_id', 'string', required=True, nullable=False),
        FieldDefinition('customer_segment', 'string', required=True, nullable=False,
                       allowed_values=['premium', 'standard', 'basic']),
        FieldDefinition('lifetime_value', 'float', required=True, nullable=True,
                       min_value=0),
        FieldDefinition('email_verified', 'bool', required=False, nullable=True),
    ],
    primary_key=['customer_id']
)

# Validate our data against the contract
contract_result = CUSTOMER_CONTRACT.validate(customers_df)
print(f"Contract: {contract_result['contract']}")
print(f"Valid: {contract_result['valid']}")
if contract_result['violations']:
    print(f"Violations: {contract_result['violations']}")

---
## Section 12.6: Data Drift Detection

Monitoring for data drift is essential for production ML systems. We need to detect both **data drift** (input distributions changing) and **concept drift** (the relationship between inputs and outputs changing).

### Snippet 12-4: Statistical Drift Detection

In [None]:
# Snippet 12-4: Data Drift Detection

class DriftDetector:
    """Detect data drift and concept drift in ML pipelines."""
    
    def __init__(self, reference_df: pd.DataFrame, numeric_columns: List[str]):
        self.reference_df = reference_df
        self.numeric_columns = numeric_columns
        self.reference_stats = self._compute_reference_stats()
    
    def _compute_reference_stats(self) -> Dict:
        """Compute reference statistics for comparison."""
        stats = {}
        for col in self.numeric_columns:
            if col in self.reference_df.columns:
                data = self.reference_df[col].dropna()
                stats[col] = {
                    'mean': data.mean(),
                    'std': data.std(),
                    'median': data.median(),
                    'q25': data.quantile(0.25),
                    'q75': data.quantile(0.75)
                }
        return stats
    
    def calculate_psi(self, reference: pd.Series, current: pd.Series, 
                     n_bins: int = 10) -> float:
        """Calculate Population Stability Index (PSI)."""
        # Create bins from reference data
        min_val = min(reference.min(), current.min())
        max_val = max(reference.max(), current.max())
        bins = np.linspace(min_val, max_val, n_bins + 1)
        
        # Calculate proportions
        ref_counts, _ = np.histogram(reference, bins=bins)
        curr_counts, _ = np.histogram(current, bins=bins)
        
        # Add small value to avoid division by zero
        ref_pct = (ref_counts + 0.001) / (len(reference) + 0.001 * n_bins)
        curr_pct = (curr_counts + 0.001) / (len(current) + 0.001 * n_bins)
        
        # Calculate PSI
        psi = np.sum((curr_pct - ref_pct) * np.log(curr_pct / ref_pct))
        return psi
    
    def detect_data_drift(self, current_df: pd.DataFrame, 
                         psi_threshold: float = 0.2,
                         ks_alpha: float = 0.05) -> Dict:
        """Detect data drift using multiple statistical tests."""
        drift_report = {
            'drift_detected': False,
            'feature_results': {},
            'alerts': []
        }
        
        for col in self.numeric_columns:
            if col not in current_df.columns:
                continue
            
            ref_data = self.reference_df[col].dropna()
            curr_data = current_df[col].dropna()
            
            # PSI test
            psi = self.calculate_psi(ref_data, curr_data)
            
            # Kolmogorov-Smirnov test
            ks_stat, ks_pvalue = stats.ks_2samp(ref_data, curr_data)
            
            # Mean shift
            ref_mean = ref_data.mean()
            curr_mean = curr_data.mean()
            mean_shift = (curr_mean - ref_mean) / ref_mean if ref_mean != 0 else 0
            
            feature_result = {
                'psi': psi,
                'psi_drift': psi > psi_threshold,
                'ks_statistic': ks_stat,
                'ks_pvalue': ks_pvalue,
                'ks_drift': ks_pvalue < ks_alpha,
                'mean_shift_pct': mean_shift * 100
            }
            
            drift_report['feature_results'][col] = feature_result
            
            # Generate alerts
            if psi > psi_threshold:
                drift_report['drift_detected'] = True
                severity = 'HIGH' if psi > 0.25 else 'MEDIUM'
                drift_report['alerts'].append(
                    f"{severity}: {col} PSI={psi:.3f} exceeds threshold {psi_threshold}"
                )
            
            if ks_pvalue < ks_alpha:
                drift_report['drift_detected'] = True
                drift_report['alerts'].append(
                    f"STATISTICAL: {col} KS test p-value={ks_pvalue:.4f} < {ks_alpha}"
                )
        
        return drift_report


# Create reference data (first half) and current data (second half with drift)
n_half = len(customers_df) // 2
reference_customers = customers_df.iloc[:n_half].copy()
current_customers = customers_df.iloc[n_half:].copy()

# Introduce artificial drift in current data
current_customers['lifetime_value'] = current_customers['lifetime_value'] * 1.3 + 100

# Detect drift
detector = DriftDetector(
    reference_customers, 
    numeric_columns=['lifetime_value']
)

drift_report = detector.detect_data_drift(current_customers)

print("Drift Detection Report:")
print(f"Drift Detected: {drift_report['drift_detected']}")
print(f"\nFeature Results:")
for feature, results in drift_report['feature_results'].items():
    print(f"  {feature}:")
    print(f"    PSI: {results['psi']:.4f} (drift: {results['psi_drift']})")
    print(f"    KS p-value: {results['ks_pvalue']:.4f} (drift: {results['ks_drift']})")
    print(f"    Mean shift: {results['mean_shift_pct']:.1f}%")
print(f"\nAlerts: {drift_report['alerts']}")

---
## Section 12.7: Late-Arriving Data Handling

Incremental pipelines must handle records that arrive after their logical time window has been processed.

### Snippet 12-5: Late-Arriving Data Pattern

In [None]:
# Snippet 12-5: Late-Arriving Data Handling

class LateDataHandler:
    """Handle late-arriving data in incremental pipelines."""
    
    def __init__(self, late_arrival_lookback_days: int = 3):
        self.late_arrival_lookback_days = late_arrival_lookback_days
    
    def process_with_late_handling(self, df: pd.DataFrame, 
                                   event_date_col: str,
                                   ingestion_date_col: str,
                                   processing_date: datetime) -> Dict:
        """
        Process data while accounting for late arrivals.
        
        Args:
            df: Input DataFrame
            event_date_col: Column with event timestamp
            ingestion_date_col: Column with ingestion timestamp
            processing_date: The date we're processing for
        
        Returns:
            Dict with primary and late-arriving records
        """
        # Define windows
        primary_start = processing_date - timedelta(days=1)
        primary_end = processing_date
        late_window_start = processing_date - timedelta(days=self.late_arrival_lookback_days)
        
        # Ensure datetime columns
        df[event_date_col] = pd.to_datetime(df[event_date_col])
        df[ingestion_date_col] = pd.to_datetime(df[ingestion_date_col])
        
        # Primary data: events from yesterday
        primary_mask = (
            (df[event_date_col] >= primary_start) &
            (df[event_date_col] < primary_end)
        )
        
        # Late arrivals: old events that were just ingested
        late_mask = (
            (df[event_date_col] >= late_window_start) &
            (df[event_date_col] < primary_start) &
            (df[ingestion_date_col] >= primary_start)  # Recently ingested
        )
        
        primary_records = df[primary_mask]
        late_records = df[late_mask]
        
        return {
            'primary_records': primary_records,
            'late_records': late_records,
            'primary_count': len(primary_records),
            'late_count': len(late_records),
            'processing_date': processing_date
        }


# Simulate late-arriving data
test_df = transactions_df.copy()
test_df['ingestion_timestamp'] = test_df['transaction_timestamp'] + pd.to_timedelta(
    np.random.choice([0, 1, 2, 3], len(test_df), p=[0.9, 0.05, 0.03, 0.02]), unit='D'
)

handler = LateDataHandler(late_arrival_lookback_days=3)
result = handler.process_with_late_handling(
    test_df,
    event_date_col='transaction_timestamp',
    ingestion_date_col='ingestion_timestamp',
    processing_date=pd.Timestamp('2024-06-15')
)

print(f"Processing date: {result['processing_date']}")
print(f"Primary records (yesterday): {result['primary_count']}")
print(f"Late-arriving records: {result['late_count']}")
if result['late_count'] > 0:
    print(f"\nLate records sample:")
    print(result['late_records'][['transaction_timestamp', 'ingestion_timestamp']].head())

---
## Section 12.8: Airflow DAG Pattern (Reference)

This section shows the TaskFlow API pattern for modern Airflow DAGs. This code would run in an Airflow environment.

### Snippet 12-6: Modern Airflow DAG Pattern

In [None]:
# Snippet 12-6: Modern Airflow DAG Pattern (Reference Implementation)
# This code demonstrates the pattern - it would run in an Airflow environment

DAG_CODE = '''
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    'owner': 'ml-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
}

@dag(
    dag_id='automl_data_pipeline',
    default_args=default_args,
    schedule='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['automl', 'data-pipeline'],
)
def automl_pipeline():
    """Modern TaskFlow DAG for AutoML data pipeline."""
    
    @task
    def extract_data(**context):
        """Extract data with point-in-time correctness."""
        execution_date = context['execution_date']
        # Extract logic here
        return {'path': '/data/extracted.parquet', 'count': 1000}
    
    @task
    def validate_data(extract_result, **context):
        """Validate data quality."""
        # Validation logic here
        return {'valid': True, 'path': extract_result['path']}
    
    @task
    def compute_features(validated_result, **context):
        """Compute features with temporal correctness."""
        # Feature engineering logic here
        return {'features_path': '/data/features.parquet'}
    
    @task
    def check_drift(features_result, **context):
        """Check for data drift."""
        # Drift detection logic here
        return {'drift_detected': False}
    
    # Define flow using TaskFlow
    extracted = extract_data()
    validated = validate_data(extracted)
    features = compute_features(validated)
    drift_check = check_drift(features)

# Instantiate
dag = automl_pipeline()
'''

print("Modern Airflow DAG Pattern (TaskFlow API):")
print("=" * 50)
print(DAG_CODE)

---
## Section 12.9: Idempotent Operations

Idempotency is critical for production pipelines - operations should produce identical results when run multiple times.

### Snippet 12-7: Idempotent Write Pattern

In [None]:
# Snippet 12-7: Idempotent Write Pattern

import os
import shutil
import tempfile

class IdempotentWriter:
    """Write data idempotently using atomic operations."""
    
    @staticmethod
    def write_parquet(df: pd.DataFrame, output_path: str, 
                     partition_key: str = None) -> str:
        """
        Write DataFrame to parquet idempotently.
        
        Uses atomic rename to ensure partial writes don't corrupt data.
        Running this multiple times with same data produces same result.
        """
        # Create temp file in same directory (for atomic rename)
        output_dir = os.path.dirname(output_path) or '.'
        os.makedirs(output_dir, exist_ok=True)
        
        temp_path = f"{output_path}.tmp.{os.getpid()}"
        
        try:
            # Write to temp location
            df.to_parquet(temp_path, index=False)
            
            # Atomic rename (overwrites existing = idempotent)
            shutil.move(temp_path, output_path)
            
            return output_path
            
        except Exception as e:
            # Clean up temp file on failure
            if os.path.exists(temp_path):
                os.remove(temp_path)
            raise e
    
    @staticmethod
    def write_partitioned(df: pd.DataFrame, base_path: str,
                         partition_col: str, execution_date: datetime) -> str:
        """
        Write with date partitioning for idempotent incremental processing.
        """
        partition_value = execution_date.strftime('%Y-%m-%d')
        partition_path = os.path.join(base_path, f"dt={partition_value}")
        output_path = os.path.join(partition_path, "data.parquet")
        
        return IdempotentWriter.write_parquet(df, output_path)


# Demonstrate idempotent write
with tempfile.TemporaryDirectory() as tmpdir:
    # First write
    path1 = IdempotentWriter.write_partitioned(
        customers_df.head(100),
        tmpdir,
        partition_col='execution_date',
        execution_date=datetime(2024, 6, 15)
    )
    print(f"First write: {path1}")
    
    # Second write (same data, same path = idempotent)
    path2 = IdempotentWriter.write_partitioned(
        customers_df.head(100),
        tmpdir,
        partition_col='execution_date',
        execution_date=datetime(2024, 6, 15)
    )
    print(f"Second write: {path2}")
    
    # Verify same result
    df1 = pd.read_parquet(path1)
    df2 = pd.read_parquet(path2)
    print(f"\nResults identical: {df1.equals(df2)}")

---
## Section 12.10: Summary

This notebook demonstrated key patterns for building robust data pipelines that support AutoML:

1. **Point-in-Time Correctness** - Always filter features to only use data available before prediction time

2. **Hierarchical Validation** - Run cheap checks first, fail fast on obvious problems

3. **Data Contracts** - Formalize expectations about data schema and quality

4. **Drift Detection** - Monitor for both data drift (distributions) and concept drift (relationships)

5. **Late-Arriving Data** - Handle records that arrive after their logical time window

6. **Modern Airflow Patterns** - Use TaskFlow API and @dag decorator for cleaner code

7. **Idempotent Operations** - Ensure operations produce identical results when re-run

These patterns form the foundation of reliable AutoML data infrastructure.

In [None]:
print("Chapter 12 notebook complete!")
print("\nKey takeaways:")
print("- Point-in-time correctness prevents data leakage")
print("- Hierarchical validation catches issues early and cheaply")
print("- Data contracts formalize data expectations")
print("- Drift detection enables proactive model maintenance")
print("- Idempotency ensures reliable pipeline operations")