In [None]:
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
from cosmos import DbtTaskGroup
from datetime import datetime
import pandas as pd

# Função para processar dados extraídos da API
def process_api_data(**kwargs):
    ti = kwargs['ti']
    print("[LOG] Iniciando processamento dos dados da API...")
    api_data = ti.xcom_pull(task_ids='extract_api_data')
    print(f"[LOG] Dados extraídos da API: {api_data}")
    df = pd.DataFrame(api_data['results'])
    print(f"[LOG] DataFrame criado com {len(df)} linhas")
    df.to_csv('/path/to/data/api_data.csv', index=False)
    print("[LOG] Dados da API salvos em /path/to/data/api_data.csv")

# DAG principal
with DAG(
    'cosmos_multi_source_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:

    # Etapa 1: Extrai dados de uma API
    print("[LOG] Configurando extração da API...")
    extract_api_data = SimpleHttpOperator(
        task_id='extract_api_data',
        method='GET',
        http_conn_id='api_conn',
        endpoint='data_endpoint',
        do_xcom_push=True
    )

    # Etapa 2: Extrai dados de um banco de dados SQL
    print("[LOG] Configurando extração do banco SQL...")
    extract_sql_data = PostgresOperator(
        task_id='extract_sql_data',
        postgres_conn_id='postgres_conn',
        sql="""
        COPY (SELECT * FROM sales_data WHERE date = CURRENT_DATE)
        TO '/path/to/data/sql_data.csv' CSV HEADER;
        """
    )

    # Etapa 3: Processa os dados da API
    print("[LOG] Configurando processamento dos dados da API...")
    process_data = PythonOperator(
        task_id='process_data',
        python_callable=process_api_data
    )

    # Etapa 4: Usa Cosmos para rodar transformações com dbt
    print("[LOG] Configurando transformações dbt com Cosmos...")
    dbt_transforms = DbtTaskGroup(
        group_id='dbt_transformations',
        dir='/path/to/dbt_project',
        profile_args={"target": "prod"},
    )

    # Etapa 5: Carrega os dados no Data Warehouse
    print("[LOG] Configurando carga de dados no Data Warehouse...")
    load_data = PostgresOperator(
        task_id='load_data',
        postgres_conn_id='warehouse_conn',
        sql="""
        COPY transformed_data FROM '/path/to/dbt_project/target/output.csv' CSV HEADER;
        """
    )

    # Define a ordem das tarefas
    print("[LOG] Definindo ordem das tarefas...")
    [extract_api_data, extract_sql_data] >> process_data >> dbt_transforms >> load_data

    print("[LOG] DAG configurada com sucesso!")
