# Apache Airflow Tutorial
⚠️ Cost-Saving Note
Amazon MWAA can be expensive to run. To minimize cost, please form a temporary group of two or more students (this group may differ from your homework group).

## Setting Up Airflow on AWS (Amazon MWAA)
0. Create an S3 bucket

    - Go to the AWS Management Console and ensure your region is set to N. Virginia.

    - Create a new bucket named Lastname1-Lastname2-Bucket (replace with actual last names).

    - Leave all other settings as default.

    - After the bucket is created, create a folder named dags inside it.

1. Open Amazon MWAA

    - In the AWS Console search bar, type Amazon MWAA and navigate to the service.

    - Make sure the region is N. Virginia, then click "Create environment".

2. Configure the environment

    - Set the environment name to Lastname1-Lastname2-MWAA.

    - Select the latest Airflow version available.

3. Configure storage

    - For the S3 bucket, choose the one you just created.

    - Set the DAGs folder to the dags folder you created inside that bucket.

4. Configure networking

    - For VPC, select MWAA-VPC-DE300.

    - For Web server access, choose Public.

5. Launch the environment

    - Scroll to the bottom of the page and click "Create environment".

    - It will take approximately 20–30 minutes for the environment to become active.

## Introduction
Apache Airflow is an open-source workflow orchestration tool that allows users to define, schedule, and monitor workflows as Directed Acyclic Graphs (DAGs). It is widely used for automating ETL (Extract, Transform, Load) processes, data pipelines, and machine learning workflows.

This tutorial will guide you through the installation, basic concepts, and workflow creation using Airflow.

## Installation
Before installing Airflow, ensure you have the latest version pip. You can upgrade it by
```
python -m pip install --upgrade pip
```
The we can install Airflow by
```
pip install apache-airflow
```
In the examples in the lab session today, we may also need the following 2 dependencies:
```
pip install apache-airflow-providers-sftp
    - [ ] pip install apache-airflow-providers-postgres
```

## Concepts
### DAG (Directed Acyclic Graph)

A Directed Acyclic Graph (DAG) is a collection of tasks that are organized in a way that reflects their dependencies. In Airflow, DAGs define workflows where tasks must be executed in a specified sequence without any cyclic dependencies (loops). Each DAG has a start date, a schedule interval, and a set of tasks connected by dependencies.

Key characteristics of DAGs:

    - Directed: Tasks execute in a specific order.

    - Acyclic: No task can depend on a downstream task that eventually depends on itself, preventing infinite loops.

    - Graph: A network structure that allows for complex task dependencies.

Example DAG visualization:

### Tasks and Operators

Tasks are the fundamental building blocks of a DAG. Airflow provides predefined Operators for various tasks, such as:

    - `PythonOperator` – Runs Python functions

    - `BashOperator` – Executes shell commands

    - `PostgresOperator` – Runs SQL queries on a Postgres database

    - `SFTPOperator` – Transfers files via SFTP

## A Simple DAGs example
#### 0.Import required packages

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
import pendulum
import random

#### 1.Define the workflow schedule frequency
cheduling: Airflow supports cron-like scheduling using `schedule_interval` (e.g., `@daily`, `@hourly`, `0 12 * * *`).

In [None]:
WORKFLOW_SCHEDULE = "@hourly"

#### 2.Default arguments dictionary for the DAG execution

In [None]:
# Default arguments dictionary for the DAG execution
default_args = {
    'owner': 'johndoe',  # Owner of the DAG
    'depends_on_past': False,  # Ensures tasks do not depend on past runs
    'start_date': pendulum.today('UTC').add(days=-1),  # DAG start date (yesterday)
    'retries': 1,  # Number of retry attempts upon failure
}

#### 3.Task Definition

In [None]:
# Task 1: Generates a success/failure outcome randomly
def task1_func(**kwargs):
    """
    Simulates a task that randomly determines success or failure.
    Output:
        - 'success' (50% probability)
        - 'failure' (50% probability)
    """
    print('Running task 1')
    value = 'success' if random.random() < 0.5 else 'failure'
    print(f'Task 1 output: {value}')
    return {'status': value}

# Task 2: Generates a random integer between 0 and 10
def task2_func(**kwargs):
    """
    Simulates a task that generates a random integer in the range [0, 10].
    Output:
        - Random integer (0-10)
    """
    print('Running task 2')
    value = random.randint(0, 10)
    print(f'Task 2 output: {value}')
    return {'value': value}

# Task 3: Generates two random integers between 0 and 10
def task3_func(**kwargs):
    """
    Simulates a task that generates two random integers in the range [0, 10].
    Output:
        - Two random integers (0-10)
    """
    print('Running task 3')
    value_one = random.randint(0, 10)
    value_two = random.randint(0, 10)
    print(f'Task 3 output: {value_one} {value_two}')
    return {'value1': value_one, 'value2': value_two}

# Task 4: Aggregates results from Task 1, Task 2, and Task 3
def task4_func(**kwargs):
    """
    Collects results from task1, task2, and task3 using XCom.
    Decides whether task5 should be executed based on:
        - If task3's first value is greater than the second value
        - If task1 returned 'success'
    Output:
        - "do-task5" if conditions are met, else empty string
    """
    print('Running task 4')
    ti = kwargs['ti']
    task1_return_value = ti.xcom_pull(task_ids='task1')
    task2_return_value = ti.xcom_pull(task_ids='task2')
    task3_return_value = ti.xcom_pull(task_ids='task3')
    
    print("Task 1 returned: ", task1_return_value)
    print("Task 2 returned: ", task2_return_value)
    print("Task 3 returned: ", task3_return_value)

    return_value = ""
    if task3_return_value['value1'] > task3_return_value['value2'] and task1_return_value['status'] == "success":
        return_value = "do-task5"
    
    return return_value

# Branching decision function
def decide_which_path(**kwargs):
    """
    Determines the next task to execute based on task4's output.
    Output:
        - 'task5' if task4 returns "do-task5"
        - 'dummy_task' otherwise
    """
    ti = kwargs['ti']
    task4_return_value = ti.xcom_pull(task_ids='task4')
    if task4_return_value == "do-task5":
        return 'task5'
    else:
        return 'dummy_task'

# Task 5: Final task execution
def task5_func(**kwargs):
    """
    Executes task5 if the branching condition is met.
    Output:
        - A dictionary indicating task completion
    """
    print('Running task 5')
    return {'task': 'task5', 'status': 'completed'}

#### 4.Initiate the DAG

In [None]:
dag = DAG(
    'example_dag',  # Name of the DAG
    default_args=default_args,
    description='An example DAG with dependencies',
    schedule=WORKFLOW_SCHEDULE,  # Schedule interval for DAG execution
    tags=["de300"]  # DAG tagging for categorization
)

#### 5.Define the tasks

In [None]:
# Task 1: Generates success/failure
task1 = PythonOperator(
    task_id='task1',
    python_callable=task1_func,
    dag=dag,
)

# Task 2: Generates a random integer
task2 = PythonOperator(
    task_id='task2',
    python_callable=task2_func,
    dag=dag,
)

# Task 3: Generates two random integers
task3 = PythonOperator(
    task_id='task3',
    python_callable=task3_func,
    dag=dag,
)

# Task 4: Aggregates task results and makes a decision
task4 = PythonOperator(
    task_id='task4',
    python_callable=task4_func,
    dag=dag,
)

# Branching task: Determines execution path based on task4's output
decide = BranchPythonOperator(
    task_id='branch_task',
    python_callable=decide_which_path,
    dag=dag,
)

# Task 5: Final task if the condition is met
task5 = PythonOperator(
    task_id='task5',
    python_callable=task5_func,
    dag=dag,
)

# Dummy task: Acts as an alternative path
dummy_task = EmptyOperator(
    task_id='dummy_task',
    dag=dag,
)

#### 6.Define the dependencies among tasks

In [None]:
[task1, task2, task3] >> task4 >> decide  # Task1, Task2, Task3 -> Task4 -> Decision

decide >> task5  # If condition met, execute Task5
decide >> dummy_task  # If condition not met, execute Dummy Task

### Best Practices

1. Parameterize workflows: Use Airflow variables and templates.

2. Use task dependencies: Define dependencies using `>>` and `<<`.

## 0. Reads parameters from a TOML configuration file (optional)
```{python}
def read_config() -> dict:
    path = pathlib.Path(CONFIG_FILE)
    with path.open(mode="rb") as param_file:
        params = tomli.load(param_file)
    return params
```

## 1. DAG Definition and Configuration
    - `dag_id`: Unique identifier for the DAG.
    - `default_args`: Dictionary with default parameters for tasks.
    - `schedule_interval`: Defines how often the DAG runs.
    - `start_date`: Determines when the DAG starts running.
    - `tags`: Helps categorize DAGs in the UI.
    
```python
from airflow import DAG
import pendulum

default_args = {
    'owner': 'user',
    'depends_on_past': False,
    'start_date': pendulum.today('UTC').add(days=-1),
    'retries': 1,
}

dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple DAG example',
    schedule_interval="@daily",
    tags=['example']
)
```

## 2. Task Definition
### PythonOperator
Usage: Executes a Python function within a DAG.
```python
from airflow.operators.python import PythonOperator

def my_function(**kwargs):
    print("Hello from PythonOperator")

task = PythonOperator(
    task_id='print_hello',
    python_callable=my_function,
    dag=dag
)
```
### BashOperator
```python
from airflow.operators.bash import BashOperator

task = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)
```
### EmptyOperator
```python
from airflow.operators.empty import EmptyOperator

start = EmptyOperator(task_id='start', dag=dag)
end = EmptyOperator(task_id='end', dag=dag)
```

## 3. Task Dependencies
```python
task1 >> task2  # task1 runs before task2
task3 << task2  # task3 runs after task2

[task1, task2] >> task3  # Both task1 and task2 must complete before task3
```

## 4. XCom for Data Passing Between Tasks
### Push Data
```python
def push_data(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='my_key', value=42)
```
### Pull Data
```python
def pull_data(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_task', key='my_key')
    print(f"Received value: {value}")
```

## 5. Branching Logic
```python
from airflow.operators.python import BranchPythonOperator

def choose_branch(**kwargs):
    return 'task_A' if random.random() < 0.5 else 'task_B'

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=choose_branch,
    dag=dag
)
```

## 6. Sensors for External Triggers
### TimeSensor
```python
from airflow.sensors.time_sensor import TimeSensor

wait_for_time = TimeSensor(
    task_id='wait_until_10am',
    target_time='10:00:00',
    dag=dag
)
```
### FileSensor
```python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file.txt',
    poke_interval=60,
    timeout=600,
    dag=dag
)
```

## 7. Triggering Other DAGs
```python
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger = TriggerDagRunOperator(
    task_id='trigger_other_dag',
    trigger_dag_id='other_dag',
    dag=dag
)
```

## 8. Dynamic Task Generation
```python
task_list = []
for i in range(3):
    task = PythonOperator(
        task_id=f'task_{i}',
        python_callable=lambda: print(f"Task {i}"),
        dag=dag
    )
    task_list.append(task)

task_list[0] >> task_list[1] >> task_list[2]
```

## 9. Task Failure Handling
```python
from datetime import timedelta

def failure_callback(context):
    print(f"Task {context['task_instance'].task_id} failed")

task = PythonOperator(
    task_id='task_with_failure_callback',
    python_callable=my_function,
    on_failure_callback=failure_callback,
    retries=3,
    retry_delay=timedelta(minutes=5),
    dag=dag
)
```

## 10. Parallel Task Execution
```python
parallel_task1 = PythonOperator(task_id='parallel_1', python_callable=my_function, dag=dag)
parallel_task2 = PythonOperator(task_id='parallel_2', python_callable=my_function, dag=dag)

start >> [parallel_task1, parallel_task2] >> end
```