# Подготовка

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
import os
from typing import List, Tuple

In [None]:
# Параметры DAG
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_customer_orders',
    default_args=default_args,
    description='DAG для загрузки исходных данных',
    schedule='@daily',
    catchup=False,
    tags=['etl', 'analytics'],
)


# Шаг 1

In [None]:
def load_data_to_table(table_name: str, file_name: str):
    file_path = f'./{file_name}.csv'
    
    df = pd.read_csv(file_path)
    hook = PostgresHook(postgres_conn_id='postgres_default')
    conn = hook.get_conn()

    hook.run(f'TRUNCATE TABLE {table_name};')
    
    for _, row in df.iterrows():
        hook.insert_rows(
            table=table_name,
            rows=[tuple(row)],
            target_fields=df.columns.tolist()
        )
    conn.commit()
    conn.close()

    
## таска
load_customers = PythonOperator(
    task_id='load_customers',
    python_callable=load_data_to_table,
    op_kwargs={'table_name': 'customer', 'file_name': 'customers'},
    dag=dag,
)
## таска
load_products = PythonOperator(
    task_id='load_products',
    python_callable=load_data_to_table,
    op_kwargs={'table_name': 'product', 'file_name': 'products'},
    dag=dag,
)
## таска
load_orders = PythonOperator(
    task_id='load_orders',
    python_callable=load_data_to_table,
    op_kwargs={'table_name': 'orders', 'file_name': 'orders'},
    dag=dag,
)
## таска
load_order_items = PythonOperator(
    task_id='load_order_items',
    python_callable=load_data_to_table,
    op_kwargs={'table_name': 'order_items', 'file_name': 'order_items'},
    dag=dag,
)

# Шаг 2

In [None]:
def run_analysis_query(query: str, output_file: str):
    hook = PostgresHook(postgres_conn_id='postgres_default')
    results = hook.get_records(query)
    
    df = pd.DataFrame(results)
    df.to_csv(f'/data/output/{output_file}.csv', index=False)
    return len(results)  # Возвращаем количество строк для проверки

In [None]:
# Запрос 1: ТОП-3 min/max суммы транзакций
query_top_transactions = """
select 
    c.first_name,
    c.last_name,
    coalesce(sum(oi.quantity * oi.item_list_price_at_sale), 0) as total_spent
from customer c
left join orders o on c.customer_id = o.customer_id
left join order_items oi on o.order_id = oi.order_id
group by c.customer_id, c.first_name, c.last_name
order by total_spent asc
limit 3
union all
select
    c.first_name,
    c.last_name,
    coalesce(sum(oi.quantity * oi.item_list_price_at_sale), 0) as total_spent
from customer c
left join orders o on c.customer_id = o.customer_id
left join order_items oi on o.order_id = oi.order_id
group by c.customer_id, c.first_name, c.last_name
order by total_spent desc
limit 3;
"""

analyze_top_transactions = PythonOperator(
    task_id='analyze_top_transactions',
    python_callable=run_analysis_query,
    op_kwargs={
        'query': query_top_transactions,
        'output_file': 'top_transactions'
    },
    dag=dag,
)

# Запрос 2: ТОП-5 клиентов в каждом сегменте благосостояния
query_wealth_segment = """
with customer_revenue as (
    select
        c.customer_id,
        c.first_name,
        c.last_name,
        c.wealth_segment,
        coalesce(sum(oi.quantity * oi.item_list_price_at_sale), 0) as total_revenue
    from customer c
    left join orders o on c.customer_id = o.customer_id
    left join order_items oi on o.order_id = oi.order_id
    group by c.customer_id, c.first_name, c.last_name, c.wealth_segment
),
ranked_customers as (
    select
        first_name,
        last_name,
        wealth_segment,
        total_revenue,
        row_number() over (partition by wealth_segment order by total_revenue desc) as rank
    from customer_revenue
)
select
    first_name,
    last_name,
    wealth_segment,
    total_revenue
from ranked_customers
where rank <= 5
order by wealth_segment, rank;
"""

## таска
analyze_wealth_segment = PythonOperator(
    task_id='analyze_wealth_segment',
    python_callable=run_analysis_query,
    op_kwargs={
        'query': query_wealth_segment,
        'output_file': 'wealth_segment_top'
    },
    dag=dag,
)

# Шаг 3

In [None]:
def check_results(**context):
    task_instances = context['dag_run'].get_task_instances()
    failed_tasks = []

    for ti in task_instances:
        if ti.task_id in ['analyze_top_transactions', 'analyze_wealth_segment']:
            if ti.xcom_pull(task_ids=ti.task_id) == 0:
                failed_tasks.append(ti.task_id)

    if failed_tasks:
        print('Ошибка')

# Шаг 4

In [None]:
### Не очень понятно, что подразумевалось под успешным/неуспешным выполнением.
### Поскольку проверка на ненулевое количество строк уже реализована в шаге 3,
### здесь сделал общую проверку, что таски успешно завершились


def log_dag_status(**context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    
    # Проверяем, есть ли проваленные задачи
    failed_tasks = [
        ti for ti in task_instances
        if ti.state == 'failed'
    ]
    
    if failed_tasks:
        print(f'DAG завершён с ошибкой. Провалились задачи: {[ti.task_id for ti in failed_tasks]}')
    else:
        print("DAG успешно выполнен: все задачи завершены без ошибок.")

##Таска-чекалка
status_logger = PythonOperator(
    task_id='log_dag_status',
    python_callable=log_dag_status,
    trigger_rule='always',
    dag=dag,
)