In [1]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
import pandas as pd
import os

def extract():
    hook = PostgresHook(postgres_conn_id='operations_db')
    df = hook.get_pandas_df("""
        SELECT date, region, total_orders, fulfilled_orders 
        FROM operations 
        WHERE date >= CURRENT_DATE - INTERVAL '7 days'
    """)
    return df

def transform(**context):
    ti = context['ti']
    df = ti.xcom_pull(task_ids='extract')
    
    df['fulfillment_rate'] = df['fulfilled_orders'] / df['total_orders']
    df['is_underperforming'] = (df['fulfillment_rate'] < 0.85)
    return df

def generate_report(**context):
    os.makedirs('reports', exist_ok=True)
    ti = context['ti']
    df = ti.xcom_pull(task_ids='transform')
    
    report_path = f"reports/report_{datetime.now().date()}.html"
    df.to_html(report_path)
    return report_path

default_args = {
    'owner': 'airflow',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'ops_monitoring',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract
    )
    
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
        provide_context=True
    )
    
    report_task = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
        provide_context=True
    )
    
    extract_task >> transform_task >> report_task

ModuleNotFoundError: No module named 'airflow'