# Lab 7: End-to-End Pipeline Integration - Integrating Airflow with Data Ecosystem

## üéØ Objectives
- Integrate Airflow with Kafka to ingest streaming data
- Orchestrate Spark jobs with Airflow
- Connect to databases (PostgreSQL, MongoDB)
- Build complete data pipeline from source to destination
- Implement error handling and recovery strategies
- Monitoring and alerting for pipelines
- Best practices for production pipelines

## üìã Prerequisites
- Completed Lab 1-6
- Kafka Lab is running (optional)
- Spark Lab is running (optional)
- Understand concepts from previous labs

## üèóÔ∏è Pipeline Architecture Overview
```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  Kafka  ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚ñ∂‚îÇ Airflow ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚ñ∂‚îÇ  Spark  ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚ñ∂‚îÇDatabase  ‚îÇ
‚îÇ (Source)‚îÇ     ‚îÇ(Orchestr)‚îÇ     ‚îÇ(Process)‚îÇ     ‚îÇ(Dest)    ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
     ‚îÇ                ‚îÇ                ‚îÇ                ‚îÇ
     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                    Airflow Orchestrates All
```


## 1. Import Libraries and Setup


In [None]:
# Import Airflow and integration modules
from airflow.sdk import DAG, task
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# Kafka integration
try:
    from kafka import KafkaConsumer, KafkaProducer
    KAFKA_AVAILABLE = True
except ImportError:
    KAFKA_AVAILABLE = False
    print("‚ö†Ô∏è  kafka-python not available. Install with: pip install kafka-python")

# Spark integration
try:
    from pyspark.sql import SparkSession
    SPARK_AVAILABLE = True
except ImportError:
    SPARK_AVAILABLE = False
    print("‚ö†Ô∏è  pyspark not available. Install with: pip install pyspark")

import pendulum
from datetime import datetime, timedelta
import json
import os

print("‚úÖ Integration modules imported successfully!")
print(f"Kafka available: {KAFKA_AVAILABLE}")
print(f"Spark available: {SPARK_AVAILABLE}")


## 2. ETL Pipeline with Multiple Data Sources

Create a complete ETL pipeline with multiple data sources and destinations.


In [None]:
# Complete ETL Pipeline
@dag(
    dag_id="complete_etl_pipeline",
    schedule="@daily",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["etl", "pipeline", "integration"],
)
def complete_etl_pipeline():
    """
    ### Complete ETL Pipeline
    End-to-end ETL pipeline with multiple sources and destinations.
    """
    
    @task
    def extract_from_api(**context):
        """Extract data from API"""
        execution_date = context['ds']
        print(f"Extracting data from API for date: {execution_date}")
        
        # Simulate API call
        data = {
            "date": execution_date,
            "records": [
                {"id": 1, "name": "Product A", "sales": 1000},
                {"id": 2, "name": "Product B", "sales": 2000},
                {"id": 3, "name": "Product C", "sales": 1500},
            ],
            "source": "api",
        }
        
        # Save to temporary file
        output_path = f"/tmp/airflow_data/api_data_{execution_date}.json"
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)
        
        print(f"Extracted {len(data['records'])} records")
        return output_path
    
    @task
    def extract_from_file(**context):
        """Extract data from file system"""
        execution_date = context['ds']
        print(f"Extracting data from files for date: {execution_date}")
        
        # Simulate file extraction
        data = {
            "date": execution_date,
            "records": [
                {"id": 4, "name": "Product D", "sales": 800},
                {"id": 5, "name": "Product E", "sales": 1200},
            ],
            "source": "file",
        }
        
        output_path = f"/tmp/airflow_data/file_data_{execution_date}.json"
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)
        
        print(f"Extracted {len(data['records'])} records")
        return output_path
    
    @task
    def transform_data(api_path: str, file_path: str):
        """Transform and merge data from multiple sources"""
        print("Transforming and merging data...")
        
        # Load data from both sources
        with open(api_path, 'r') as f:
            api_data = json.load(f)
        
        with open(file_path, 'r') as f:
            file_data = json.load(f)
        
        # Merge and transform
        all_records = api_data['records'] + file_data['records']
        
        # Calculate statistics
        total_sales = sum(record['sales'] for record in all_records)
        avg_sales = total_sales / len(all_records) if all_records else 0
        
        transformed = {
            "date": api_data['date'],
            "total_records": len(all_records),
            "total_sales": total_sales,
            "average_sales": avg_sales,
            "records": all_records,
        }
        
        # Save transformed data
        output_path = f"/tmp/airflow_data/transformed_{api_data['date']}.json"
        with open(output_path, 'w') as f:
            json.dump(transformed, f, indent=2)
        
        print(f"Transformed {transformed['total_records']} records")
        print(f"Total sales: {transformed['total_sales']}")
        return output_path
    
    @task
    def load_to_database(transformed_path: str, **context):
        """Load transformed data to database"""
        execution_date = context['ds']
        print(f"Loading data to database for date: {execution_date}")
        
        # Load transformed data
        with open(transformed_path, 'r') as f:
            data = json.load(f)
        
        # In practice, would insert into PostgreSQL
        print(f"Would insert {data['total_records']} records to database")
        print(f"Total sales: {data['total_sales']}")
        
        # Simulate database insert
        return {
            "status": "loaded",
            "records_inserted": data['total_records'],
            "date": execution_date,
        }
    
    @task
    def generate_report(load_result: dict):
        """Generate summary report"""
        print("=" * 60)
        print("ETL Pipeline Report")
        print("=" * 60)
        print(f"Date: {load_result['date']}")
        print(f"Status: {load_result['status']}")
        print(f"Records Inserted: {load_result['records_inserted']}")
        print("=" * 60)
        return "Report generated"
    
    # Define workflow
    api_data = extract_from_api()
    file_data = extract_from_file()
    transformed_data = transform_data(api_data, file_data)
    load_result = load_to_database(transformed_data)
    generate_report(load_result)

# Create DAG
complete_etl_pipeline_instance = complete_etl_pipeline()

print("‚úÖ Complete ETL Pipeline DAG created!")
print(f"Tasks: {[task.task_id for task in complete_etl_pipeline_instance.tasks]}")


## 3. Kafka Integration - Stream Data Ingestion

Integrate Airflow with Kafka to ingest streaming data.


In [None]:
# Kafka Integration Pipeline
if KAFKA_AVAILABLE:
    @dag(
        dag_id="kafka_integration_pipeline",
        schedule="@hourly",
        start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
        catchup=False,
        tags=["kafka", "streaming", "integration"],
    )
    def kafka_integration_pipeline():
        """
        ### Kafka Integration Pipeline
        Ingest data t·ª´ Kafka v√† process v·ªõi Airflow.
        """
        
        def consume_kafka_messages(**context):
            """Consume messages t·ª´ Kafka topic"""
            kafka_bootstrap_servers = "localhost:9092"
            topic_name = "stock-data"
            
            print(f"Consuming messages from Kafka topic: {topic_name}")
            
            try:
                consumer = KafkaConsumer(
                    topic_name,
                    bootstrap_servers=kafka_bootstrap_servers,
                    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                    auto_offset_reset='latest',
                    consumer_timeout_ms=5000,  # Timeout sau 5 gi√¢y
                )
                
                messages = []
                for message in consumer:
                    messages.append(message.value)
                    if len(messages) >= 10:  # Limit s·ªë messages
                        break
                
                consumer.close()
                
                print(f"Consumed {len(messages)} messages from Kafka")
                
                # Save messages
                output_path = "/tmp/airflow_data/kafka_messages.json"
                os.makedirs(os.path.dirname(output_path), exist_ok=True)
                with open(output_path, 'w') as f:
                    json.dump(messages, f, indent=2)
                
                return output_path
                
            except Exception as e:
                print(f"‚ö†Ô∏è  Kafka consumption failed: {e}")
                print("üí° Make sure Kafka is running: docker compose up -d (in Kafka_lab)")
                # Return empty file path ƒë·ªÉ pipeline kh√¥ng fail
                return None
        
        def process_kafka_data(file_path: str):
            """Process data t·ª´ Kafka"""
            if not file_path or not os.path.exists(file_path):
                print("No Kafka data to process")
                return {"status": "skipped", "records": 0}
            
            with open(file_path, 'r') as f:
                messages = json.load(f)
            
            # Process messages
            total_volume = sum(msg.get('volume', 0) for msg in messages)
            avg_price = sum(msg.get('close', 0) for msg in messages) / len(messages) if messages else 0
            
            result = {
                "status": "processed",
                "total_messages": len(messages),
                "total_volume": total_volume,
                "average_price": avg_price,
            }
            
            print(f"Processed Kafka data: {result}")
            return result
        
        # Tasks
        consume_task = PythonOperator(
            task_id="consume_kafka_messages",
            python_callable=consume_kafka_messages,
        )
        
        process_task = PythonOperator(
            task_id="process_kafka_data",
            python_callable=process_kafka_data,
            op_args=[consume_task.output],  # Pass output t·ª´ previous task
        )
        
        consume_task >> process_task
    
    kafka_pipeline = kafka_integration_pipeline()
    print("‚úÖ Kafka Integration Pipeline DAG created!")
else:
    print("‚ö†Ô∏è  Kafka integration DAG not created (kafka-python not available)")


In [None]:
# Database Integration Pipeline
@dag(
    dag_id="database_integration_pipeline",
    schedule="@daily",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    tags=["database", "postgresql", "integration"],
)
def database_integration_pipeline():
    """
    ### Database Integration Pipeline
    T√≠ch h·ª£p v·ªõi PostgreSQL ƒë·ªÉ th·ª±c hi·ªán database operations.
    
    Note: Setup PostgreSQL connection trong Airflow UI:
    - Connection ID: postgres_default
    - Connection Type: Postgres
    - Host: postgres
    - Schema: airflow
    - Login: airflow
    - Password: airflow
    - Port: 5432
    """
    
    # Task 1: Create table
    create_table = SQLExecuteQueryOperator(
        task_id="create_sales_table",
        conn_id="postgres_default",
        sql="""
        CREATE TABLE IF NOT EXISTS daily_sales (
            id SERIAL PRIMARY KEY,
            date DATE NOT NULL,
            product_name VARCHAR(100),
            sales_amount DECIMAL(10, 2),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(date, product_name)
        );
        """,
    )
    
    # Task 2: Insert data v·ªõi Jinja templating
    def insert_sales_data(**context):
        """Insert sales data v√†o database"""
        execution_date = context['ds']
        
        # Sample data
        sales_data = [
            {"product": "Product A", "amount": 1000.00},
            {"product": "Product B", "amount": 2000.00},
            {"product": "Product C", "amount": 1500.00},
        ]
        
        hook = PostgresHook(postgres_conn_id="postgres_default")
        
        for sale in sales_data:
            sql = f"""
            INSERT INTO daily_sales (date, product_name, sales_amount)
            VALUES ('{execution_date}', '{sale['product']}', {sale['amount']})
            ON CONFLICT (date, product_name) DO UPDATE
            SET sales_amount = EXCLUDED.sales_amount;
            """
            hook.run(sql)
        
        print(f"Inserted {len(sales_data)} records for date: {execution_date}")
        return len(sales_data)
    
    insert_task = PythonOperator(
        task_id="insert_sales_data",
        python_callable=insert_sales_data,
    )
    
    # Task 3: Generate report t·ª´ database
    generate_report = SQLExecuteQueryOperator(
        task_id="generate_sales_report",
        conn_id="postgres_default",
        sql="""
        SELECT 
            date,
            COUNT(*) as product_count,
            SUM(sales_amount) as total_sales,
            AVG(sales_amount) as avg_sales
        FROM daily_sales
        WHERE date = '{{ ds }}'
        GROUP BY date;
        """,
    )
    
    # Define dependencies
    create_table >> insert_task >> generate_report

# Create DAG
database_pipeline = database_integration_pipeline()

print("‚úÖ Database Integration Pipeline DAG created!")
print(f"Tasks: {[task.task_id for task in database_pipeline.tasks]}")
print("\nüí° Note: Setup PostgreSQL connection in Airflow UI before running")


## 5. Error Handling v√† Recovery Strategies

Implement error handling v√† recovery strategies cho production pipelines.


In [None]:
# Pipeline v·ªõi Error Handling
@dag(
    dag_id="error_handling_pipeline",
    schedule="@daily",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "on_failure_callback": None,  # C√≥ th·ªÉ add callback function
    },
    tags=["error-handling", "recovery"],
)
def error_handling_pipeline():
    """
    ### Error Handling Pipeline
    Pipeline v·ªõi comprehensive error handling v√† recovery.
    """
    
    from airflow.providers.standard.operators.empty import EmptyOperator
    
    start = EmptyOperator(task_id="start")
    
    def extract_with_retry(**context):
        """Extract v·ªõi retry logic"""
        import random
        
        # Simulate potential failure
        if random.random() < 0.3:  # 30% chance of failure
            raise Exception("Extraction failed - will retry")
        
        print("Extraction successful")
        return {"status": "extracted", "records": 100}
    
    extract_task = PythonOperator(
        task_id="extract_data",
        python_callable=extract_with_retry,
        retries=3,
        retry_delay=timedelta(minutes=2),
    )
    
    def validate_data(**context):
        """Validate extracted data"""
        ti = context['ti']
        extracted = ti.xcom_pull(task_ids='extract_data')
        
        if not extracted or extracted.get('records', 0) == 0:
            raise ValueError("No data extracted - validation failed")
        
        print(f"Validation passed: {extracted['records']} records")
        return extracted
    
    validate_task = PythonOperator(
        task_id="validate_data",
        python_callable=validate_data,
    )
    
    def process_data(**context):
        """Process data"""
        ti = context['ti']
        data = ti.xcom_pull(task_ids='validate_data')
        print(f"Processing {data['records']} records")
        return {"status": "processed"}
    
    process_task = PythonOperator(
        task_id="process_data",
        python_callable=process_data,
    )
    
    # Error handling task
    def handle_failure(**context):
        """Handle failures"""
        print("Handling failure - sending alerts, cleaning up...")
        return "Failure handled"
    
    handle_failure_task = PythonOperator(
        task_id="handle_failure",
        python_callable=handle_failure,
        trigger_rule="one_failed",  # Ch·∫°y n·∫øu c√≥ task failed
    )
    
    # Cleanup task - lu√¥n ch·∫°y
    def cleanup(**context):
        """Cleanup resources"""
        print("Cleaning up temporary files and resources...")
        return "Cleanup completed"
    
    cleanup_task = PythonOperator(
        task_id="cleanup",
        python_callable=cleanup,
        trigger_rule="all_done",  # Lu√¥n ch·∫°y
    )
    
    end = EmptyOperator(
        task_id="end",
        trigger_rule="all_done",
    )
    
    # Define dependencies
    start >> extract_task >> validate_task >> process_task >> end
    [extract_task, validate_task, process_task] >> handle_failure_task >> cleanup_task
    cleanup_task >> end

# Create DAG
error_handling_pipeline_instance = error_handling_pipeline()

print("‚úÖ Error Handling Pipeline DAG created!")
print(f"Tasks: {[task.task_id for task in error_handling_pipeline_instance.tasks]}")
print("\nüí° Error Handling Strategies:")
print("  - Retries v·ªõi exponential backoff")
print("  - Validation tasks ƒë·ªÉ catch errors early")
print("  - Failure handling tasks v·ªõi trigger_rule='one_failed'")
print("  - Cleanup tasks v·ªõi trigger_rule='all_done'")
print("  - Callbacks cho notifications")


## 6. Monitoring v√† Alerting

Implement monitoring v√† alerting cho pipelines ƒë·ªÉ track health v√† performance.


In [None]:
# Pipeline v·ªõi Monitoring
@dag(
    dag_id="monitoring_pipeline",
    schedule="@daily",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    tags=["monitoring", "alerting"],
)
def monitoring_pipeline():
    """
    ### Monitoring Pipeline
    Pipeline v·ªõi monitoring v√† alerting capabilities.
    """
    
    @task
    def track_start(**context):
        """Track pipeline start"""
        execution_date = context['ds']
        dag_run_id = context['dag_run'].dag_id
        
        metrics = {
            "dag_run_id": dag_run_id,
            "execution_date": execution_date,
            "start_time": datetime.now().isoformat(),
            "status": "started",
        }
        
        print(f"Pipeline started: {metrics}")
        return metrics
    
    @task
    def process_with_metrics(**context):
        """Process v·ªõi metrics tracking"""
        start_time = datetime.now()
        
        # Simulate processing
        records_processed = 1000
        processing_time = (datetime.now() - start_time).total_seconds()
        
        metrics = {
            "records_processed": records_processed,
            "processing_time_seconds": processing_time,
            "throughput": records_processed / processing_time if processing_time > 0 else 0,
        }
        
        print(f"Processing metrics: {metrics}")
        return metrics
    
    @task
    def check_health(**context):
        """Health check"""
        ti = context['ti']
        metrics = ti.xcom_pull(task_ids='process_with_metrics')
        
        # Health checks
        health_status = {
            "status": "healthy",
            "checks": {
                "records_processed": metrics['records_processed'] > 0,
                "processing_time": metrics['processing_time_seconds'] < 60,
                "throughput": metrics['throughput'] > 10,
            }
        }
        
        # Check if all health checks pass
        if not all(health_status['checks'].values()):
            health_status['status'] = "unhealthy"
            print(f"‚ö†Ô∏è  Health check failed: {health_status}")
        else:
            print(f"‚úÖ Health check passed: {health_status}")
        
        return health_status
    
    @task
    def send_alerts(health_status: dict, **context):
        """Send alerts n·∫øu c√≥ v·∫•n ƒë·ªÅ"""
        if health_status['status'] == "unhealthy":
            print("üö® Sending alert: Pipeline unhealthy!")
            print(f"Failed checks: {[k for k, v in health_status['checks'].items() if not v]}")
            # In th·ª±c t·∫ø, s·∫Ω g·ª≠i email/Slack/PagerDuty
        else:
            print("‚úÖ No alerts needed - pipeline healthy")
        
        return "Alerts sent"
    
    # Define workflow
    start_metrics = track_start()
    process_metrics = process_with_metrics()
    health_status = check_health()
    send_alerts(health_status)

# Create DAG
monitoring_pipeline_instance = monitoring_pipeline()

print("‚úÖ Monitoring Pipeline DAG created!")
print(f"Tasks: {[task.task_id for task in monitoring_pipeline_instance.tasks]}")
print("\nüí° Monitoring Features:")
print("  - Track pipeline start/end times")
print("  - Collect processing metrics")
print("  - Health checks")
print("  - Alerting cho failures")
print("  - Performance monitoring")


## 7. Complete Data Pipeline - End-to-End Example

T·∫°o m·ªôt complete data pipeline t√≠ch h·ª£p t·∫•t c·∫£ components.


In [None]:
# Complete End-to-End Pipeline
@dag(
    dag_id="complete_data_pipeline",
    schedule="@daily",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["complete", "end-to-end", "production"],
)
def complete_data_pipeline():
    """
    ### Complete Data Pipeline
    End-to-end data pipeline t√≠ch h·ª£p t·∫•t c·∫£ components:
    - Data extraction t·ª´ multiple sources
    - Data transformation
    - Data validation
    - Data loading
    - Monitoring v√† alerting
    """
    
    from airflow.sdk.task_group import TaskGroup
    
    # Start
    start = EmptyOperator(task_id="start")
    
    # Extraction Group
    with TaskGroup("extraction_group") as extraction_group:
        @task
        def extract_api():
            """Extract t·ª´ API"""
            print("Extracting from API...")
            return {"source": "api", "records": 100}
        
        @task
        def extract_files():
            """Extract t·ª´ files"""
            print("Extracting from files...")
            return {"source": "files", "records": 50}
        
        api_data = extract_api()
        file_data = extract_files()
    
    # Transformation Group
    with TaskGroup("transformation_group") as transformation_group:
        @task
        def merge_data(api_data: dict, file_data: dict):
            """Merge data t·ª´ nhi·ªÅu sources"""
            total_records = api_data['records'] + file_data['records']
            print(f"Merged {total_records} records")
            return {"total_records": total_records}
        
        @task
        def transform_data(merged_data: dict):
            """Transform data"""
            print(f"Transforming {merged_data['total_records']} records")
            return {"status": "transformed", **merged_data}
        
        merged = merge_data(api_data, file_data)
        transformed = transform_data(merged)
    
    # Validation
    @task
    def validate_data(transformed_data: dict):
        """Validate transformed data"""
        if transformed_data['total_records'] > 0:
            print("‚úÖ Validation passed")
            return transformed_data
        else:
            raise ValueError("Validation failed: No records")
    
    # Loading Group
    with TaskGroup("loading_group") as loading_group:
        @task
        def load_to_database(validated_data: dict):
            """Load v√†o database"""
            print(f"Loading {validated_data['total_records']} records to database")
            return {"status": "loaded"}
        
        @task
        def load_to_file(validated_data: dict):
            """Load v√†o file"""
            print(f"Loading {validated_data['total_records']} records to file")
            return {"status": "saved"}
        
        # Load tasks - validated s·∫Ω ƒë∆∞·ª£c pass t·ª´ outside
        db_result = load_to_database(validated)
        file_result = load_to_file(validated)
    
    # Monitoring
    @task
    def generate_report(db_result: dict, file_result: dict, **context):
        """Generate final report"""
        execution_date = context['ds']
        print("=" * 60)
        print("Pipeline Execution Report")
        print("=" * 60)
        print(f"Date: {execution_date}")
        print(f"Database Status: {db_result['status']}")
        print(f"File Status: {file_result['status']}")
        print("=" * 60)
        return "Report generated"
    
    # End
    end = EmptyOperator(
        task_id="end",
        trigger_rule="all_done",
    )
    
    # Define workflow
    # transformed l√† output t·ª´ transformation_group TaskGroup
    validated = validate_data(transformed)
    start >> extraction_group >> transformation_group >> validated >> loading_group >> generate_report(db_result, file_result) >> end

# Create DAG
complete_pipeline_instance = complete_data_pipeline()

print("‚úÖ Complete Data Pipeline DAG created!")
print(f"Tasks: {[task.task_id for task in complete_pipeline_instance.tasks]}")
print("\nüìä Pipeline Structure:")
print("  start ‚Üí extraction_group ‚Üí transformation_group ‚Üí validate ‚Üí loading_group ‚Üí report ‚Üí end")


## 8. Best Practices cho Production Pipelines

### ‚úÖ Production Best Practices:

1. **Error Handling:**
   - Set retries v√† retry_delay ph√π h·ª£p
   - Implement validation tasks
   - Use trigger rules cho cleanup
   - Add failure callbacks

2. **Monitoring:**
   - Track metrics v√† performance
   - Implement health checks
   - Set up alerting
   - Monitor resource usage

3. **Data Quality:**
   - Validate data ·ªü m·ªói stage
   - Check data completeness
   - Verify data schema
   - Handle missing data

4. **Performance:**
   - Use appropriate executors
   - Optimize task dependencies
   - Parallel processing khi c√≥ th·ªÉ
   - Cache intermediate results

5. **Security:**
   - Use Airflow Connections cho credentials
   - Don't hardcode secrets
   - Use Variables cho config
   - Implement access controls

6. **Maintainability:**
   - Use TaskGroups ƒë·ªÉ organize
   - Add docstrings v√† comments
   - Version control DAGs
   - Document dependencies

### ‚ö†Ô∏è Common Production Issues:

1. **Resource exhaustion**: Too many concurrent tasks
2. **Data quality issues**: Missing validation
3. **Long-running tasks**: Blocking scheduler
4. **Memory issues**: Large data trong XCom
5. **Network issues**: External service failures


## 9. T√≥m t·∫Øt v√† K·∫øt lu·∫≠n

### ‚úÖ Nh·ªØng g√¨ ƒë√£ h·ªçc trong to√†n b·ªô Airflow Lab Series:

**Lab 1: Airflow Basics**
- Ki·∫øn tr√∫c Airflow
- Web UI v√† CLI
- REST API

**Lab 2: DAGs v√† Tasks**
- Task SDK (@dag, @task decorators)
- Task dependencies
- Error handling

**Lab 3: Operators v√† Hooks**
- BashOperator, PythonOperator, SQLExecuteQueryOperator
- Hooks cho external systems
- So s√°nh @task vs Operators

**Lab 4: Task Dependencies v√† Branching**
- Bitshift operators
- BranchPythonOperator
- Trigger rules
- Dynamic task mapping
- TaskGroups

**Lab 5: XCom v√† Data Sharing**
- XCom push/pull
- Task return values
- Data passing best practices

**Lab 6: Scheduling v√† Timetables**
- Cron expressions
- Timedelta scheduling
- Custom timetables
- Catchup v√† data intervals

**Lab 7: End-to-End Pipeline Integration**
- Kafka integration
- Database operations
- Error handling v√† recovery
- Monitoring v√† alerting
- Complete production pipelines

### üéØ Key Takeaways:

1. **Airflow l√† orchestration tool**, kh√¥ng ph·∫£i processing engine
2. **Task SDK** l√† modern approach, nh∆∞ng **Operators** v·∫´n quan tr·ªçng
3. **XCom** ch·ªâ cho small data, d√πng file storage cho large data
4. **Scheduling** ph·ª©c t·∫°p h∆°n cron - hi·ªÉu data intervals
5. **Error handling** l√† critical cho production
6. **Monitoring** gi√∫p maintain healthy pipelines

### üìö Next Steps:

1. **Deploy to Production:**
   - Setup Airflow tr√™n production environment
   - Configure executors (Celery, Kubernetes)
   - Setup monitoring v√† alerting

2. **Advanced Topics:**
   - Custom operators v√† hooks
   - Airflow plugins
   - Dynamic DAG generation
   - Multi-tenant deployments

3. **Integration:**
   - Cloud services (AWS, GCP, Azure)
   - Data quality tools (Great Expectations)
   - Transformation tools (dbt)
   - ML pipelines (MLflow, Kubeflow)

### üîó Useful Resources:

- [Airflow Documentation](https://airflow.apache.org/docs/apache-airflow/3.1.1/)
- [Airflow Best Practices](https://airflow.apache.org/docs/apache-airflow/3.1.1/best-practices/)
- [Airflow Providers](https://airflow.apache.org/docs/apache-airflow-providers/)
- [Airflow GitHub](https://github.com/apache/airflow)
- [Airflow Slack Community](https://apache-airflow.slack.com)

### üéâ Congratulations!

B·∫°n ƒë√£ ho√†n th√†nh to√†n b·ªô Airflow Lab Series! B√¢y gi·ªù b·∫°n c√≥ ƒë·ªß ki·∫øn th·ª©c ƒë·ªÉ:
- Thi·∫øt k·∫ø v√† implement data pipelines v·ªõi Airflow
- T√≠ch h·ª£p Airflow v·ªõi c√°c h·ªá th·ªëng kh√°c
- Deploy v√† maintain production pipelines
- Troubleshoot v√† optimize pipelines

**Happy Orchestrating! üöÄ**
