# Airflow DAGs

- Dags are made up of components (task) to be executed such as operators, sensors, etc.

## Defining the DAG

```python
from airflow import DAG
from dataetime import datetime

defaut_arguments = {
    'owner': 'kim',
    'email': 'kim@gmail.com',
    'start_date': datetime(2020, 1, 20)
}
# etl_dag = DAG(...)
with DAG('etl_workflow', default_args=default_arguments) as etl_dag:

```

## Operators
- Represent a single task (a class of a task, task is an instance of an operator)
  - run a python script
  - send an email
- They do not share information 

### BashOperator
- Executes a given bash command or script
- Would appear inisde a `with DAG():`
- Runs the command in a temporary directory that clean itself up afterwards
- **can specify env var for the command (runtime settings provided by the shell)**
```python

from airflow.operators.bash import BashOperator
BashOperator(
    task_id = 'bash_example',
    bash_command = 'runcleanup.sh', # uses a predefined bash script
)

# this operator runs a simple cleanup
bash_task = BashOperator(
    task_id = 'clean_addresses',
    bash_command = 'cat addresses.txt | awk "NF==10" > cleaned.txt', 
)
```

## Task

- Instances of operators
- Usually assigned to a variable in python

```python
bash_task = BashOperator(
    task_id = 'clean_addresses',
    bash_command = 'cat addresses.txt | awk "NF==10" > cleaned.txt', 
)
```

### Task Dependencies
- Define the order of task completion
- AKA upstream or downstream tasks
- We used `bitshift` operators to define dependencies
  - `>>` upstream (before)
  - `<<` downstream (after)
  - `task 1 >> task 2`

```python
task1 = BashOperator(
    task_id = 'firs_task',
    bash_command = 'echo1',
)

task2 = BashOperator(
    task_id = 'secoond_task',
    bash_command = 'echo 2'
)

task1 >> task2  # task2 << task 1 

# You can also do
# task1 >> task2
# task3 >> task2
```

----

## Python Operators
- executes a callable (function)
- can pass in arguments to the python code

```python
from airflow.operators.python import PythonOperator

def printme():
    print('something')

python_task = PythonOperator(
    task_id = 'simple_print',
    python_callable=printme
)
```

### You can pass arguments:
- `op_kwargs` we use that to use keyword arguments

```python
def sleep(length_of_time):
    time.sleep(length_of_time)

sleep_task = PythonOperator(
    task_id = 'sleep',
    python_callable = sleep,
    op_kwargs = {'length_of_time': 5}
)
```

An example of python code operator for data engineering is when **pulling** a data:

```python
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}
)
```

The next task that you probably want to do is process it and send it thru emails



---
## Airflow Scheduling

### DAG Run
To understand scheduling, one must know `DAG Run`, that is an instanced of a workflow at a specific time
- Could be a workflow runnin now
- workflow running yesterday
- Can be run **manually** or via `schedule_interval` passed  when creating a DAG
  - running
  - failed
  - success

### Airflow DAG Scheduling

To control when your DAGs run, configure the following key attributes:

- `start_date`:  
  - **Required**  
  - The date/time from which the DAG starts scheduling (in UTC).
  - Example: `datetime(2025, 8, 1, 6, 0)` — starts August 1 at 6 AM UTC

- `end_date`:  
  - **Optional**  
  - The last date for scheduling DAG runs. After this, no new runs will be created.

- `max_tries`:  
  - **Optional**  
  - Limits the number of retries for failed DAG runs.

- `schedule_interval`:  
  - **Required**  
  - Defines how often the DAG runs.
  - Accepts:
    - **Cron expressions** (e.g., `"0 12 * * *"`)  
    - **Presets**:
      - `"@once"` → run only once
      - `"@hourly"` → every hour
      - `"@daily"` → every day at midnight
      - `"@weekly"` → every Sunday at midnight
      - `"@monthly"` → first day of month at midnight
      - `"@yearly"` or `"@annually"` → Jan 1 at midnight

### Cron Syntax

┌──────────── minute (0 - 59)\
│ ┌────────── hour (0 - 23)\
│ │ ┌──────── day of month (1 - 31)\
│ │ │ ┌────── month (1 - 12)\
│ │ │ │ ┌──── day of week (0 - 6) (Sunday=0)\
│ │ │ │ │\
│ │ │ │ │\
\*  *  *  *   *


| Example        | Meaning                             |
|----------------|-------------------------------------|
| `* * * * *`     | Every minute                        |
| `0 * * * *`     | Every hour at minute 0              |
| `0 12 * * *`    | Every day at 12:00 PM               |
| `0 8 * * 1`     | Every Monday at 8:00 AM             |
| `30 6 * * 1-5`  | Weekdays (Mon–Fri) at 6:30 AM       |
| `15 14 1 * *`   | Every month on the 1st at 2:15 PM   |
| `*/10 * * * *`  | Every 10 minutes                    |

> Use tools like [crontab.guru](https://crontab.guru) to help visualize schedules.

### Sample code:
```python
# Update the scheduling arguments as defined
default_args = {
    'owner': 'Engineering',
    'start_date': datetime(2023, 11, 1),
    'email': ['airflowresults@datacamp.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=20)
}

dag = DAG(
    dag_id='update_dataflows',
    default_args=default_args,
    schedule_interval="30 12 * * 3" , 
)
```

---



## Airflow Sensors

Sensors are special Airflow operators that **wait for a condition to be met** before allowing the next task to run. They're used to detect **external triggers**, such as files, HTTP responses, database results, or other DAGs.


#### 🔧 Common Sensor Parameters

| Parameter       | Description                                        |
|-----------------|----------------------------------------------------|
| `poke_interval` | How often to check the condition (in seconds)      |
| `timeout`       | Max time to wait before failing (in seconds)       |
| `mode`          | `'poke'` (default) or `'reschedule'`               |
| `soft_fail`     | Mark task as skipped instead of failed on timeout  |


### Built-in Sensors

| Sensor Name           | Purpose                                       |
|-----------------------|-----------------------------------------------|
| `FileSensor`          | Wait for a file in the local filesystem       |
| `S3KeySensor`         | Wait for a file (key) in an S3 bucket         |
| `HttpSensor`          | Wait for an HTTP endpoint to respond          |
| `SqlSensor`           | Wait for a SQL query to return a result       |
| `ExternalTaskSensor`  | Wait for a task in another DAG to succeed     |


###  Example: Wait for File before Proceeding

- Checks for the existence of a file at a certain location
- checks if any files exists within a directory

```python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_input_file',
    filepath='/data/input.csv',
    poke_interval=60,   # check every 60 seconds
    timeout=600,        # stop waiting after 10 minutes
    mode='poke'         # or use 'reschedule' for better resource usage
)
```

When do we usually use a sensor?
- uncertain when it will be true
- if failure not immediately desired
- to add task repitition without loops

**Task Dependencies vs Sensors**

Q: But aren’t all tasks “waiting” for something because of `task1 >> task2`?

**Yes** — `task1 >> task2` means `task2` will only run if `task1` succeeds.
**However,** this only checks Airflow-internal task success.

To wait for something outside Airflow, like:
* a file to be dropped
* a response from an API
* a table to be populated in a database

...you need a **sensor** to actively check for that condition.

**Task dependencies** handle sequencing,
**sensors** handle readiness of external conditions.

### Sample code:

```python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime

with DAG('file_sensor_example', start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    
    wait_for_file = FileSensor(
        task_id='wait_for_input_file',
        filepath='/data/input.csv',
        poke_interval=60,
        timeout=600,
        mode='poke'
    )
    
    process_data = DummyOperator(task_id='process_data')
    
    wait_for_file >> process_data

```

---



## Airflow Executors

An **executor** in Airflow determines **how and where tasks are executed**. While the scheduler decides *when* tasks should run, the executor is responsible for actually **running** the task instances.


### Common Executor Types

#### SequentialExecutor
- Default executor for Airflow (when first installed)
- Runs **only one task at a time**
- Not suitable for production or parallel DAGs
- Good for learning or simple local testing

#### LocalExecutor
- Runs tasks **in parallel on the same machine**
- Uses Python multiprocessing
- Parallelism controlled via:
  - `parallelism` in `airflow.cfg`
  - `max_active_tasks_per_dag` in DAG settings
- Suitable for single-machine production setups

#### KubernetesExecutor
- Launches each task in a **separate Kubernetes pod**
- Highly scalable and cloud-native
- Ideal for dynamic, containerized, isolated task environments

#### CeleryExecutor *(optional)*
- Uses a **distributed task queue** (e.g., Redis/RabbitMQ)
- Allows running tasks across **multiple worker nodes**
- Good for horizontally scaling across machines


### How to Check Which Executor You're Using

- Open your `airflow.cfg` file:
  ```ini
  executor = LocalExecutor
  ```
- or run this `airflow info`

---

## Debugging and Troubleshooting in Airflow

### Common problems:
1. DAG won't run on scheduler
    - Check if scheduler is running in the web
    - or fix it easily with `airflow schedule` in the cmd
    - There could be a problem with executor
3. DAG won't load
    - DAG won't appear in web or `airrflow dags list`
    - To solve:
    - Verify DAG file is in correct folder: `airflow.cg`
5. Syntax errors
    - To see errors use `airflow dags list-import-errors`
  

---



## SLA (Service Level Agreement) in Airflow

In Airflow, an SLA defines the **maximum amount of time** a task should take to complete after the DAG run starts. It is used to monitor whether tasks are running on time.

### Defining an SLA

An SLA is defined **per task** using the `sla` parameter:

```python
from airflow.operators.python import PythonOperator
from datetime import timedelta

task = PythonOperator(
    task_id='process_data',
    python_callable=process_data_func,
    sla=timedelta(minutes=30),
    dag=dag
)
```
This means the task is expected to complete within 30 minutes from the start time of the DAG run.

### SLA Configuration Rules
- The sla parameter must be defined inside the operator, not in default_args. (but you can if you wanna to time the entire workflow)
- If defined in default_args, it will be silently ignored.
- If a task misses its SLA, it will appear under "SLA Misses" in the UI and can trigger email alerts (if configured).

### SLA Notifications
You can receive notifications for SLA misses using email_on_sla_miss in default_args:

```python
default_args = {
    'start_date': datetime(2024, 1, 1),
    'email': ['alerts@example.com'],
    'email_on_sla_miss': True,
    'email_on_failure': = True,
    'email_on_success': = True,   
}

```

### SLA vs Task Duration
- Tasks like PythonOperator or BashOperator often finish quickly (seconds or milliseconds).
- SLA is not based on average runtime, but on how long you're willing to wait before alerting if something is delayed.
- Even fast tasks can benefit from SLA monitoring if they are critical.

---

# 4. Building Production Pipelines

## Working with Templates

- Templates are simply jinja, for example if you want to `echo` differen files you can do it this way:

```python
task1 = BashOperator(
    task_id = 'task_id1'
    command = 'echo file1.txt',
)
task2 = BashOperator(
    task_id = 'task_id2'
    command = 'echo file2.txt',
)
```

Now that is inefficient because we can have more than 100 files, what you can do then is use jinja:

```python
templated_command = """
    echo "Reading {{ params.filename }}"
"""
t1 = BashOperator(
    bash_command=templated_coommand,
    params = {'filename': 'file1.txt'}
)
```
so to improve the first task:

```python
templated_command = """
    echo "Reading {{ params.filename }}"
"""
task1 = BashOperator(
    task_id = 'task_id1',
    command = templated_command,
    params = {'filename': 'task1.txt'}
)
task2 = BashOperator(
    task_id = 'task_id2',
    command = templated_command,
    params={'filename': 'task2.txt'}
)
```

You can even do `for loops` inside the command and what you put inisde the params is a list.

#### Example code of using predefined variables and templates:
```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

filelist = [f'file{x}.txt' for x in range(30)]

default_args = {
  'start_date': datetime(2020, 4, 15),
}

cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')

# Modify the template to handle multiple files in a 
# single run.
templated_command = """
  <% for filename in params.filenames %>
  bash cleandata.sh {{ ds_nodash }} {{ filename }};
  <% endfor %>
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filenames': filelist},
                          dag=cleandata_dag)
```

---

## Sending Templated emails

```python
from airflow import DAG
from airflow.operators.email import EmailOperator
from datetime import datetime

# Create the string representing the html email content
html_email_str = """
Date: {{ ds }}
Username: {{ params.username }}
"""

email_dag = DAG('template_email_test',
                default_args={'start_date': datetime(2023, 4, 15)},
                schedule_interval='@weekly')
                
email_task = EmailOperator(task_id='email_task',
                           to='testuser@datacamp.com',
                           subject="{{ macros.uuid.uuid4() }}",
                           html_content=html_email_str,
                           params={'username': 'testemailuser'},
                           dag=email_dag)


---

## Branching
- provides conditional logic (`if`)
- use BranchPythonOperator: `from airflow.operaots.python import BranchPythonOperator`


For example, if you have tasks for even days and odd days, you can create a `branchpythonoperator` that has an `if` and then it will return a string which will be processed by airflow and it expects those strings to be the `string_id` of the task

```python
def branch_test(**kwargs):
    if int(kwargs['ds_nodash']) % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'
branch_task = BranchPythonOperator(
    task_id = 'branch_task',
    dag=dag,
    provide_context=True,
    python_callable=branch_test
)

start_task >> branch_task >> even_day_task >> even_day_task2
beanch_task >> odd_day_task >> odd_day_task2
```

> The function returns the task ID of the next task(s) to run. Airflow will skip any downstream tasks not returned.

### Q: Why does the function only return a string? How does that decide what runs?
Airflow expects the BranchPythonOperator to return a task_id (or list of task_ids). Only those returned tasks will be executed; all other downstream tasks will be marked as skipped.

### Q: Why does the function use **kwargs? Shouldn't it use op_kwargs={'key': value}?
In standard Python, **kwargs captures named arguments passed to the function. In Airflow, when provide_context=True is used, it automatically injects Airflow’s execution context as **kwargs, including variables like ds, ds_nodash, execution_date, ti, etc. You do not need to manually pass these values.

If you want to pass your own custom arguments, you can use op_kwargs, like this:
```python
BranchPythonOperator(
    task_id='custom_branch',
    python_callable=my_func,
    provide_context = True,
    op_kwargs={'threshold': 10}
)
```
Then your function can look like:
```python
def my_func(threshold, **kwargs):
    ...
```

### Sample code:
```python
# Create a function to determine if years are different
def year_check(**kwargs):
    current_year = int(kwargs['ds_nodash'][0:4])
    previous_year = int(kwargs['prev_ds_nodash'][0:4])
    if current_year == previous_year:
        return 'current_year_task'
    else:
        return 'new_year_task'

# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
                                   python_callable=year_check, provide_context=True)
# Define the dependencies
branch_task >> current_year_task
branch_task >> new_year_task
```


---

## Pipeline

Review:
- To run a specific task: `airrflow tasks test <dag_id> <task_id> <date>`
- To run a full airflow: `airflow dags trigger -e <date> <dag_id>`

### Sample Production Pipeline

```python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.email import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

# Update the default arguments and apply them to the DAG.

default_args = {
  'start_date': datetime(2023,1,1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""

email_report_task = EmailOperator(task_id='email_report_task',
                                  to='sales@mycompany.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)

no_email_task = EmptyOperator(task_id='no_email_task', dag=dag)

def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   python_callable=check_weekend,
                                   provide_context=True,                                
                                   dag=dag)

sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]
```