<a href="https://colab.research.google.com/github/orokgospel/monthly_weekend_trip_metrics_dag/blob/main/Dag_weekend_trip_metrics_monthly_report.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# @title
# ============================================================
#   AIRFLOW DAG: Weekend Report ETL with ClickHouse (Secure)
#   Loads credentials from .env, performs ETL, sends email
# ============================================================

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import requests
import yagmail
import traceback
import os
from dotenv import load_dotenv
from io import StringIO

# ------------------------------------------------------------
# 1. LOAD ENVIRONMENT VARIABLES (.env FILE)
# ------------------------------------------------------------
load_dotenv()  # This loads CLICKHOUSE_* and EMAIL_* automatically

# ------------------------------------------------------------
# 2. HELPER: Connect to ClickHouse HTTP Endpoint
# ------------------------------------------------------------
def clickhouse_query(sql: str) -> str:
    """
    Execute SQL against ClickHouse using its HTTP interface.
    Credentials and host are loaded from .env for full security.
    """
    response = requests.post(
        os.getenv("CLICKHOUSE_HOST"),
        data=sql.encode("utf-8"),
        auth=(os.getenv("CLICKHOUSE_USER"), os.getenv("CLICKHOUSE_PASSWORD")),
        headers={"Content-Type": "text/plain"},
        timeout=60
    )
    response.raise_for_status()
    return response.text


def clickhouse_query_dataframe(sql: str) -> pd.DataFrame:
    """
    Return a pandas DataFrame from ClickHouse SQL.
    Converts ClickHouse output to CSV and parses it.
    """
    csv_sql = f"SELECT * FROM ({sql}) FORMAT CSVWithNames"
    output = clickhouse_query(csv_sql)
    return pd.read_csv(StringIO(output))

# ------------------------------------------------------------
# 3. HELPER: SAFE EMAIL SENDER
# ------------------------------------------------------------
def send_email_safe(subject, message):
    """
    Send email notifications using Gmail App Password.
    This function NEVER breaks the DAG — errors are logged but suppressed.
    """
    try:
        user = os.getenv("EMAIL_USER")
        password = os.getenv("EMAIL_PASSWORD")
        to_addr = os.getenv("EMAIL_TO")

        yag = yagmail.SMTP(user, password)
        yag.send(to_addr, subject, message)
        print("Email sent successfully")

    except Exception as e:
        print(f"Email sending failed: {e}")

# ------------------------------------------------------------
# 4. SQL QUERY TO RUN
# ------------------------------------------------------------
SQL_QUERY = """
SELECT
  toMonth(tpep_pickup_datetime) AS month_num,
  countIf(toDayOfWeek(tpep_pickup_datetime) = 6) AS sat_mean_trip_count,
  avgIf(fare_amount, toDayOfWeek(tpep_pickup_datetime) = 6) AS sat_mean_fare_per_trip,
  avgIf(trip_duration, toDayOfWeek(tpep_pickup_datetime) = 6) AS sat_mean_duration_per_trip,

  countIf(toDayOfWeek(tpep_pickup_datetime) = 7) AS sun_mean_trip_count,
  avgIf(fare_amount, toDayOfWeek(tpep_pickup_datetime) = 7) AS sun_mean_fare_per_trip,
  avgIf(trip_duration, toDayOfWeek(tpep_pickup_datetime) = 7) AS sun_mean_duration_per_trip

FROM tripdata
WHERE tpep_pickup_datetime >= '2014-01-01'
  AND tpep_pickup_datetime <= '2016-12-31'
GROUP BY toMonth(tpep_pickup_datetime)
ORDER BY toMonth(tpep_pickup_datetime)
"""

DESTINATION_TABLE = "weekend_monthly_report"

# ------------------------------------------------------------
# 5. ETL TASK FUNCTIONS
# ------------------------------------------------------------

def extract_and_validate(**context):
    """
    Extracts data from ClickHouse and validates:
    - Data must not be empty
    - Month numbers must be valid (1–12)
    """
    print("Running ClickHouse query...")
    df = clickhouse_query_dataframe(SQL_QUERY)

    if df.empty:
        raise ValueError("❌ Query returned NO DATA!")

    if not df["month_num"].between(1, 12).all():
        raise ValueError("❌ Invalid month_num detected!")

    # Save to XCom
    context['ti'].xcom_push(key="result_df", value=df.to_json())
    print("Validation successful.")


def create_destination_table():
    """
    Ensures the destination table exists (idempotent).
    """
    sql = f"""
    CREATE TABLE IF NOT EXISTS {DESTINATION_TABLE} (
        month_num UInt8,
        sat_mean_trip_count UInt32,
        sat_mean_fare_per_trip Float64,
        sat_mean_duration_per_trip Float64,
        sun_mean_trip_count UInt32,
        sun_mean_fare_per_trip Float64,
        sun_mean_duration_per_trip Float64
    ) ENGINE = MergeTree()
    ORDER BY month_num;
    """
    clickhouse_query(sql)
    print("Destination table ensured.")


def load_results(**context):
    """
    Inserts transformed results into ClickHouse.
    """
    df_json = context['ti'].xcom_pull(key="result_df")
    df = pd.read_json(df_json)

    insert_sql = (
        f"INSERT INTO {DESTINATION_TABLE} FORMAT CSVWithNames\n" +
        df.to_csv(index=False)
    )

    clickhouse_query(insert_sql)
    print("Insert completed.")


def notify_success():
    send_email_safe(
        "ETL SUCCESS: Weekend Trip Report",
        "ETL job completed successfully and data is available in ClickHouse."
    )


def notify_failure(context):
    """
    This is executed automatically by Airflow when ANY task fails.
    """
    error_info = "".join(
        traceback.format_exception(None, context["exception"], context["exception"])
    )

    send_email_safe(
        "❌ ETL FAILED",
        f"ETL Pipeline failed!\n\nError Details:\n{error_info}"
    )

# ------------------------------------------------------------
# 6. AIRFLOW DAG DEFINITION
# ------------------------------------------------------------
with DAG(
    dag_id="clickhouse_weekend_report_etl_secure",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    on_failure_callback=notify_failure,
    default_args={"owner": "gospel"}
) as dag:

    extract_task = PythonOperator(
        task_id="extract_and_validate",
        python_callable=extract_and_validate,
        provide_context=True
    )

    create_table_task = PythonOperator(
        task_id="create_destination_table",
        python_callable=create_destination_table
    )

    load_task = PythonOperator(
        task_id="load_results",
        python_callable=load_results,
        provide_context=True
    )

    success_email_task = PythonOperator(
        task_id="notify_success",
        python_callable=notify_success
    )

    # DAG WORKFLOW
    extract_task >> create_table_task >> load_task >> success_email_task


ModuleNotFoundError: No module named 'airflow'

In [2]:
# @title
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from clickhouse_driver import Client
import psycopg2
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging

# ---------------------------------------------------------
# DAG DEFAULT SETTINGS
# ---------------------------------------------------------
default_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    dag_id="weekend_trip_clickhouse_etl",
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
    catchup=False,
)

# ---------------------------------------------------------
# ETL PARAMETERS
# ---------------------------------------------------------
SQL_QUERY = """
SELECT
    toMonth(tpep_pickup_datetime) AS month_num,
    countIf(toDayOfWeek(tpep_pickup_datetime) = 6) AS sat_mean_trip_count,
    avgIf(fare_amount, toDayOfWeek(tpep_pickup_datetime) = 6) AS sat_mean_fare_per_trip,
    avgIf(trip_duration, toDayOfWeek(tpep_pickup_datetime) = 6) AS sat_mean_trip_duration
FROM yellow_taxi_trips
WHERE toYear(tpep_pickup_datetime) BETWEEN 2014 AND 2016
GROUP BY month_num
ORDER BY month_num;
"""

DEST_TABLE = "public.weekend_trip_report"


# ---------------------------------------------------------
# DATABASE FUNCTIONS
# ---------------------------------------------------------

def extract_from_clickhouse():
    """
    Extract data from ClickHouse using an Airflow Connection.
    """

    # ---------------------------------------------------------------------
    # BEST PRACTICE (Comment Only):
    # Use environment variables:
    # click_user = os.getenv("CLICKHOUSE_USER")
    # click_pass = os.getenv("CLICKHOUSE_PASSWORD")
    # ---------------------------------------------------------------------

    # Using Airflow Connection instead
    from airflow.hooks.base import BaseHook
    conn = BaseHook.get_connection("clickhouse_default")

    client = Client(
        host=conn.host,
        port=conn.port,
        user=conn.login,
        password=conn.password,
        database=conn.schema
    )

    data = client.execute(SQL_QUERY)
    df = pd.DataFrame(data, columns=["month_num", "sat_mean_trip_count",
                                     "sat_mean_fare_per_trip", "sat_mean_trip_duration"])

    if df.empty:
        raise ValueError("Query returned no data!")

    df.to_csv("/tmp/weekend_trip_report.csv", index=False)
    logging.info("Extraction Successful — CSV saved.")

    return "/tmp/weekend_trip_report.csv"


def load_into_postgres(csv_path):
    """
    Load CSV data into Postgres using Airflow Connection.
    """
    from airflow.hooks.base import BaseHook
    conn = BaseHook.get_connection("postgres_dest")

    pg = psycopg2.connect(
        host=conn.host,
        port=conn.port,
        user=conn.login,
        password=conn.password,
        database=conn.schema
    )

    cur = pg.cursor()

    # Ensure table exists
    cur.execute(f"""
        CREATE TABLE IF NOT EXISTS {DEST_TABLE} (
            month_num INT,
            sat_mean_trip_count BIGINT,
            sat_mean_fare_per_trip FLOAT,
            sat_mean_trip_duration FLOAT
        );
    """)

    pg.commit()

    df = pd.read_csv(csv_path)

    for _, row in df.iterrows():
        cur.execute(f"""
            INSERT INTO {DEST_TABLE}
            (month_num, sat_mean_trip_count, sat_mean_fare_per_trip, sat_mean_trip_duration)
            VALUES (%s, %s, %s, %s)
        """, tuple(row))

    pg.commit()
    cur.close()
    pg.close()

    logging.info("Load Successful — Data inserted into Postgres.")


# ---------------------------------------------------------
# OPTIONAL EMAIL (DISABLED TO AVOID ERRORS)
# ---------------------------------------------------------

def send_email_success():
    """
    Disabled email function to avoid Gmail SMTP 535 error.
    Provided only as a safe template with os.getenv best-practice.
    """

    """
    # BEST PRACTICE (Comment Only)
    email_user = os.getenv("EMAIL_USER")
    email_pass = os.getenv("EMAIL_PASS")

    msg = MIMEMultipart()
    msg["From"] = email_user
    msg["To"] = "team@example.com"
    msg["Subject"] = "ETL SUCCESS"

    body = "Weekend ETL Completed Successfully."
    msg.attach(MIMEText(body, "plain"))

    server = smtplib.SMTP("smtp.gmail.com", 587)
    server.starttls()

    # Gmail will fail without proper App Password
    server.login(email_user, email_pass)

    server.send_message(msg)
    server.quit()
    """

    logging.info("Email notification skipped to avoid SMTP errors.")


# ---------------------------------------------------------
# DAG TASK DEFINITIONS
# ---------------------------------------------------------

def etl_pipeline():
    csv_file = extract_from_clickhouse()
    load_into_postgres(csv_file)
    send_email_success()
    logging.info("ETL Pipeline Completed Successfully.")


etl_task = PythonOperator(
    task_id="run_weekend_trip_etl",
    python_callable=etl_pipeline,
    dag=dag
)


ModuleNotFoundError: No module named 'airflow'