## Estrutura de uma DAG no Airflow

Uma **DAG (Directed Acyclic Graph)** no Airflow define um pipeline de tarefas. É composta por um conjunto de tarefas e as dependências entre elas. A DAG determina a ordem e as condições em que as tarefas devem ser executadas.

As principais estruturas de uma DAG incluem:

- **`dag_id`**: Um identificador único para a DAG.
- **`schedule_interval`**: Define a frequência com que a DAG deve ser executada (diariamente, semanalmente, etc.).
- **`default_args`**: Argumentos padrões, como o horário de início, dependências, e se a DAG deve ser retroativa.
- **`tasks`**: Tarefas que são executadas dentro da DAG.
- **Dependências entre tarefas**: Definem a ordem de execução entre as tarefas.

---

**Exemplo básico de uma DAG:**

1. Definir a DAG e seus argumentos básicos.
2. Criar tarefas individuais.
3. Definir dependências entre as tarefas.


In [None]:
# Exemplo de DAG simples no Airflow

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

# Definindo os argumentos padrão da DAG
default_args = {
    'owner': 'maicon',
    'depends_on_past': False,
    'start_date': datetime(2023, 9, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Definindo a DAG
with DAG(
    dag_id='exemplo_dag',
    default_args=default_args,
    description='Exemplo de uma DAG simples',
    schedule_interval=timedelta(days=1),
    catchup=False,  # Não executa execuções passadas
) as dag:

    # Criando as tarefas
    tarefa_inicial = DummyOperator(task_id='inicio')
    tarefa_final = DummyOperator(task_id='fim')

    # Definindo as dependências
    tarefa_inicial >> tarefa_final  # 'inicio' deve rodar antes de 'fim'


## Principais Operadores no Airflow

No Airflow, os **operadores** são os blocos de construção usados para definir tarefas dentro de um DAG (Directed Acyclic Graph). Cada operador representa uma única tarefa e define o que deve ser executado. Os operadores podem ser divididos em várias categorias, dependendo de suas funcionalidades, como executar comandos Bash, funções Python, transferir dados ou executar consultas SQL.

Principais operadores no Airflow:
1. **BashOperator**: Executa comandos ou scripts bash diretamente na linha de comando.
2. **PythonOperator**: Executa funções Python. É útil para executar lógica Python personalizada dentro de um DAG.
3. **EmailOperator**: Envia emails. Geralmente usado para notificar falhas ou sucessos de tarefas.
4. **PostgresOperator**: Executa comandos SQL em um banco de dados Postgres.
5. **MySqlOperator**: Executa comandos SQL em um banco de dados MySQL.
6. **S3ToGCSOperator**: Transfere arquivos do AWS S3 para o Google Cloud Storage (GCS).
7. **SimpleHttpOperator**: Faz solicitações HTTP. Útil para chamadas de API.
8. **DummyOperator**: Representa uma tarefa "sem operação", geralmente usada para testes ou como marcadores em um DAG.


In [None]:
# Exemplo de DAG usando diferentes operadores no Airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

# Função Python a ser usada no PythonOperator
def print_hello():
    print("Hello from Python!")

# Definindo o DAG
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}
dag = DAG(
    'exemplo_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

# Criando tarefas com diferentes operadores

# BashOperator - Executa um comando Bash
bash_task = BashOperator(
    task_id='executar_script_bash',
    bash_command='echo "Hello Airflow!"',
    dag=dag
)

# PythonOperator - Executa uma função Python
python_task = PythonOperator(
    task_id='executar_funcao_python',
    python_callable=print_hello,
    dag=dag
)

# DummyOperator - Não faz nada, usado como placeholder
dummy_task = DummyOperator(
    task_id='tarefa_placeholder',
    dag=dag
)

# Definindo dependências entre as tarefas
bash_task >> python_task >> dummy_task


## Regras de Gatilho (Trigger Rules) no Airflow

As **Regras de Gatilho** (Trigger Rules) no Airflow controlam quando uma tarefa deve ser executada com base no status das tarefas predecessoras. Por padrão, uma tarefa só será executada se todas as tarefas anteriores forem bem-sucedidas (trigger rule `all_success`). No entanto, há outras regras que podem ser configuradas, permitindo maior flexibilidade no controle do fluxo de trabalho.

Principais regras de gatilho:
1. **all_success** (padrão): Executa a tarefa somente se todas as predecessoras tiverem sucesso.
2. **all_failed**: Executa a tarefa somente se todas as predecessoras falharem.
3. **all_done**: Executa a tarefa independentemente do status das predecessoras (sucesso ou falha).
4. **one_success**: Executa a tarefa se pelo menos uma predecessora for bem-sucedida.
5. **one_failed**: Executa a tarefa se pelo menos uma predecessora falhar.
6. **none_failed**: Executa a tarefa se nenhuma predecessora falhar (independente de estarem em sucesso ou ainda em execução).
7. **none_skipped**: Executa a tarefa se nenhuma predecessora for pulada.
8. **dummy**: Usada principalmente para testes, a tarefa será executada de forma imediata.


In [None]:
# Exemplo de DAG com diferentes regras de gatilho no Airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

# Definindo o DAG
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}
dag = DAG(
    'exemplo_regras_gatilho',
    default_args=default_args,
    schedule_interval='@daily',
)

# Definindo tarefas
tarefa1 = DummyOperator(
    task_id='tarefa_1',
    dag=dag
)

tarefa2 = DummyOperator(
    task_id='tarefa_2',
    dag=dag
)

# Tarefa 3 só será executada se uma das predecessoras for bem-sucedida
tarefa3 = DummyOperator(
    task_id='tarefa_3',
    trigger_rule='one_success',
    dag=dag
)

# Tarefa 4 será executada independentemente do resultado das predecessoras
tarefa4 = DummyOperator(
    task_id='tarefa_4',
    trigger_rule='all_done',
    dag=dag
)

# Definindo dependências
tarefa1 >> tarefa3
tarefa2 >> tarefa3
tarefa3 >> tarefa4


## DAGs Complexas no Airflow

Uma **DAG complexa** no Airflow é composta por várias tarefas que podem ter interdependências complexas e lógicas de execução variadas. Essas DAGs envolvem múltiplos operadores, regras de gatilho personalizadas e podem ter uma grande quantidade de ramificações. A complexidade pode surgir quando se trabalha com muitas tarefas que têm diferentes condições de execução, como dependências de várias tarefas predecessoras ou regras de execução diferentes.


Elementos comuns em DAGs complexas:
1. **Dependências múltiplas**: Uma tarefa pode depender de várias tarefas predecessoras e pode ter várias tarefas sucessoras.
2. **Ramificação condicional**: Usar o `BranchPythonOperator` para decidir qual caminho seguir com base em uma condição.
3. **SubDAGs**: Utilizados para encapsular partes de um DAG dentro de um sub-DAG.
4. **Trigger Rules complexas**: Uso avançado de regras de gatilho, como `one_failed` ou `all_done`, para controlar a execução.
5. **Parallelismo**: Execução de tarefas em paralelo quando não há dependências entre elas.
6. **Cross-DAG dependencies**: Dependências entre DAGs diferentes, onde a execução de uma tarefa depende do sucesso ou falha em outra DAG.

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

# Definindo o DAG
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}
dag = DAG(
    'dag_complexa_exemplo',
    default_args=default_args,
    schedule_interval='@daily',
)

# Função para ramificação
def escolher_caminho():
    # Condição para ramificação (exemplo simples)
    return 'task_a' if True else 'task_b'

# Tarefas do DAG
inicio = DummyOperator(
    task_id='inicio',
    dag=dag
)

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=escolher_caminho,
    dag=dag
)

# Definindo tarefas que seguem a ramificação
task_a = DummyOperator(
    task_id='task_a',
    dag=dag
)

task_b = DummyOperator(
    task_id='task_b',
    dag=dag
)

# Tarefa final
fim = DummyOperator(
    task_id='fim',
    dag=dag
)

# Definindo o fluxo de tarefas
inicio >> branching
branching >> [task_a, task_b]
task_a >> fim
task_b >> fim

## O que é TaskGroup no Airflow?

O **TaskGroup** é uma funcionalidade do Apache Airflow que permite agrupar várias tarefas logicamente dentro de um DAG, facilitando a organização e visualização de pipelines complexos. Ele não altera a lógica de execução, mas melhora a **manutenção e monitoramento**, criando uma visão mais clara dos fluxos de trabalho.

- **Benefícios**:
  - Organização: Agrupa tarefas relacionadas, reduzindo a poluição visual no gráfico de dependências.
  - Escalabilidade: Ajuda a manter o código mais limpo e modular.
  - Monitoramento: Facilita o monitoramento e troubleshooting de grupos específicos de tarefas.

- **Uso Comum**:
  - Separar etapas como **extração**, **transformação** e **carga** em grupos dentro de um pipeline de ETL.
  - Agrupar tarefas relacionadas a diferentes APIs ou processos distintos, mas interconectados.


In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

# Funções para serem executadas em cada tarefa
def task_a():
    print("Executando Tarefa A")

def task_b():
    print("Executando Tarefa B")

def task_c():
    print("Executando Tarefa C")

# Definição do DAG
with DAG(
    'exemplo_task_group',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily',
    catchup=False
) as dag:

    # Criando o grupo de tarefas
    with TaskGroup('grupo_1', tooltip="Grupo 1 de tarefas") as grupo_1:
        tarefa_1 = PythonOperator(
            task_id='tarefa_a',
            python_callable=task_a
        )
        tarefa_2 = PythonOperator(
            task_id='tarefa_b',
            python_callable=task_b
        )

    # Outra tarefa fora do grupo
    tarefa_3 = PythonOperator(
        task_id='tarefa_c',
        python_callable=task_c
    )

    # Definindo as dependências
    grupo_1 >> tarefa_3
