# Apache Airflow

## Intro

Apache Airflow (or simply _Airflow_) is a platform to programmatically 
- Create (Author)
- Schedule
- Monitor 

workflows.

We can use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes the tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.

```bash
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

# start the web server, default port is 8080
airflow webserver --port 8080

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler

# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page
```

In [1]:
# Command line tool, subcommands
!airflow -h

usage: airflow [-h] GROUP_OR_COMMAND ...

positional arguments:
  GROUP_OR_COMMAND

    Groups:
      celery         Celery components
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      sync-perm      Update permissions for e

### Running a Workflow

Running a simle task
```bash
Airflow run <dag_id> <task_id> <start_date>
```

Example
```bash
# Run a task instance
airflow tasks run example_bash_operator runme_0 2021-01-01
# Run a backfill over 2 days
airflow dags backfill example_bash_operator \
    --start-date 2021-01-01 \
    --end-date 2021-01-02
```

### DAG

> In Airflow, a pipeline is represented as a Directed Acyclic Graph or DAG. The nodes of the graph represent tasks that are executed. The directed connections between nodes represent dependencies between the tasks. Representing a data pipeline as a DAG makes much sense, as some tasks need to finish before others can start. [source](https://datacamp.com)



In [None]:
from airflow.models import DAG
from datetime import datetime

# Default arguments (optional) that will be applied to the components of DAG
default_args = {
    'owner': 'name',
    'email': 'name@gmail.com',  # for alerting
    'start_date': datetime(2020, 1, 20),
    'retries': 2
}

etl_dag = DAG('example_etl', default_args=default_args)

Airflow shell command can provide a lot of useful information when creating and troubleshooting workflows.

In [10]:
!airflow dags -h

usage: airflow dags [-h] COMMAND ...

Manage DAGs

positional arguments:
  COMMAND
    backfill      Run subsections of a DAG for a specified date range
    delete        Delete all DB records related to the specified DAG
    list          List all the DAGs
    list-jobs     List the jobs
    list-runs     List DAG runs given a DAG id
    next-execution
                  Get the next execution datetimes of a DAG
    pause         Pause a DAG
    report        Show DagBag loading report
    show          Displays DAG's tasks with their dependencies
    state         Get the status of a dag run
    test          Execute one single DagRun
    trigger       Trigger a DAG run
    unpause       Resume a paused DAG

optional arguments:
  -h, --help      show this help message and exit
[0m

### Airflow Webserver and UI

In [12]:
!airflow webserver -h

usage: airflow webserver [-h] [-A ACCESS_LOGFILE] [-L ACCESS_LOGFORMAT] [-D]
                         [-d] [-E ERROR_LOGFILE] [-H HOSTNAME] [-l LOG_FILE]
                         [--pid [PID]] [-p PORT] [--ssl-cert SSL_CERT]
                         [--ssl-key SSL_KEY] [--stderr STDERR]
                         [--stdout STDOUT] [-t WORKER_TIMEOUT]
                         [-k {sync,eventlet,gevent,tornado}] [-w WORKERS]

Start a Airflow webserver instance

optional arguments:
  -h, --help            show this help message and exit
  -A ACCESS_LOGFILE, --access-logfile ACCESS_LOGFILE
                        The logfile to store the webserver access log. Use '-' to print to stderr
  -L ACCESS_LOGFORMAT, --access-logformat ACCESS_LOGFORMAT
                        The access log format for gunicorn logs
  -D, --daemon          Daemonize instead of running in the foreground
  -d, --debug           Use the server that ships with Flask in debug mode
  -E ERROR_LOGFILE, --error-logfile ERROR_

In [13]:
# Start webserver on port 8051
!airflow webserver -p 8051

## Airflow DAGs: Operators, Tasks, Scheduling

### Operators

Airflow Operators represent a single task (e.g. running a command, python script, etc.) in a workflow. There are various operators that perform different tasks. A list of operators can be found [here](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html). Some of those are:
- `DummyOperator`
- `BashOperator`
- `PythonOperator`
- `PostgresOperator`

Note that operators generally don't share information. 

#### `DummyOperator`
DummyOperator can be used for troubleshooting or testing.

```bash
DummyOperator(task_id='example', dag=dag)
```

#### `BashOperator`

- Allows to specify any given Shell command or script and add it to an Airflow workflow.
- It is possible to specify environment variables for the bash command.
- In Airflow, tilde character does not by default represent home directory. This can be fixed with env variables.

In [None]:
from airflow.operators.bash_operator import BashOperator

# Execute a Command
task1 = BashOperator(    
    task_id='bash_example',  # Name that shows up in the UI
    bash_command='echo "Hello!"',
    dag=my_dag  # Add the task to the DAG
)

# Execute a Script
task2 = BashOperator(
    task_id='bash_script',
    bash_command='runupdate.sh',
    dag=my_dag)
    
task3 = BashOperator(
    task_id='clean_address',
    bash_command='cat address.txt | awk "NF==10" > cleaned.txt',
    dag=my_dag)

Define multiple `BashOperator`s within a workflow (same DAG):

In [None]:
# Define the first operator
consolidate = BashOperator(
    task_id='consolidate_task',
    bash_command='consolidate_data.sh',
    dag=analytics_dag)

# Define the second operator
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push_data.sh',
    dag=analytics_dag)

#### `PythonOperator`

Operates similarly to the `Bash Operator`. It executes Python functions or callables, and can pass in arguments  to them. 
- Supports both positional and keyword arguments to tasks.

In [None]:
from airflow.operators.python_operator import PythonOperator

def my_func():
    print("Hallo!")

python_task = PythonOperator(
    task_id='simple_print',
    python_callable=my_func,
    dag=example_dag
)

In [None]:
def sleep(sec):  
    time.sleep(sec)

sleep_task = PythonOperator(
    task_id='sleep',
    python_callable=sleep,
    op_kwargs={'sec': 1}  # Keywords must match
    dag=example_dag
)

### Tasks

Tasks are instances of operators. Although tasks can be assigned to variables, a task is referred by its `task_id` instead of `variable name`.

Task Dependencies and Task Ordering:

Task dependencies are referred to as _upstream_ or _downstream_ tasks. Upstream tasks are compleated prior to any downstream tasks. Since Airflow 1.8, dependencies are defined using the _bitshift_ operators.
- Upstream operator: `>>`  (before)
- Downstream operator: `<<`  (after)

In [None]:
# Define the tasks
task1 = BashOperator(task_id='first_task',
                     bash_command='echo 1',
                     dag=example_dag)

task2 = BashOperator(task_id='second_task',
                     bash_command='echo 2',
                     dag=example_dag)

# Set first_task to run before second_task
task1 >> task2  # or task2 << task1


### Multistep Workflow with the `PythonOperator` and the `EmailOperator`

In [None]:
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

def get_file(url, path):
    r = requests.get(url)
    with open(path, 'wb') as f:
        f.write(r.content)
    print(f"File downloaded from {URL} and saved to {path}")

def parse_file(input_file, output_file):
    # Parse file
    # ..
    # Save file
    # ..

# Pull file
get_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=process_file_dag
)

# Parse file
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable = parse_file,
    # Add the arguments
    op_kwargs = {'inputfile':'latest.json', 'outputfile':'parsed.json'},
    # Add the DAG
    dag=process_file_dag
)

# Send file via email
email_task = EmailOperator(
    task_id='email_data_eng',
    to='tk@gmail.com',
    subject='Latest update',
    html_content='Attached is the latest JSON file.',
    files='parsed.json',
    dag=process_file_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_task

### Scheduling

A DAG Run is a specific instance of a workflow at a given  point in time.

Some important arguments used when scheduling a DAG are:
- `start_date`
- `end_date`: optional
- `schedule_interval`: optional. Using `cron` syntax or built-in presets.
- `max_tries`

Note that schedules can be modified via UI.

**Cron Presets**

<div class="wy-table-responsive"><table class="docutils align-default">
<colgroup>
<col style="width: 16%">
<col style="width: 66%">
<col style="width: 18%">
</colgroup>
<thead>
<tr class="row-odd"><th class="head"><p>preset</p></th>
<th class="head"><p>meaning</p></th>
<th class="head"><p>cron</p></th>
</tr>
</thead>
<tbody>
<tr class="row-even"><td><p><code class="docutils literal notranslate"><span class="pre">None</span></code></p></td>
<td><p>Don't schedule, use for exclusively "externally triggered"
DAGs</p></td>
<td></td>
</tr>
<tr class="row-odd"><td><p><code class="docutils literal notranslate"><span class="pre">@once</span></code></p></td>
<td><p>Schedule once and only once</p></td>
<td></td>
</tr>
<tr class="row-even"><td><p><code class="docutils literal notranslate"><span class="pre">@hourly</span></code></p></td>
<td><p>Run once an hour at the beginning of the hour</p></td>
<td><p><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">*</span></code></p></td>
</tr>
<tr class="row-odd"><td><p><code class="docutils literal notranslate"><span class="pre">@daily</span></code></p></td>
<td><p>Run once a day at midnight</p></td>
<td><p><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">*</span></code></p></td>
</tr>
<tr class="row-even"><td><p><code class="docutils literal notranslate"><span class="pre">@weekly</span></code></p></td>
<td><p>Run once a week at midnight on Sunday morning</p></td>
<td><p><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">0</span></code></p></td>
</tr>
<tr class="row-odd"><td><p><code class="docutils literal notranslate"><span class="pre">@monthly</span></code></p></td>
<td><p>Run once a month at midnight of the first day of the month</p></td>
<td><p><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">1</span> <span class="pre">*</span> <span class="pre">*</span></code></p></td>
</tr>
<tr class="row-even"><td><p><code class="docutils literal notranslate"><span class="pre">@quarterly</span></code></p></td>
<td><p>Run once a quarter at midnight on the first day</p></td>
<td><p><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">1</span> <span class="pre">*/3</span> <span class="pre">*</span></code></p></td>
</tr>
<tr class="row-odd"><td><p><code class="docutils literal notranslate"><span class="pre">@yearly</span></code></p></td>
<td><p>Run once a year at midnight of January 1</p></td>
<td><p><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">1</span> <span class="pre">1</span> <span class="pre">*</span></code></p></td>
</tr>
</tbody>
</table></div>

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

In [None]:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

In [None]:
import requests
import json

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator


default_args = {
    'owner':'data_eng',
    'start_date': datetime(2020, 2, 15),
    'retries': 1,
}

process_file_dag = DAG(
    dag_id='process_file', 
    default_args=default_args, 
    schedule_interval='@monthly')


def get_file(url, path):
    r = requests.get(url)
    with open(path, 'wb') as f:
        f.write(r.content)
    print(f"File downloaded from {URL} and saved to {path}")


def parse_file(inputfile, outputfile):
    with open(inputfile) as infile:
        data=json.load(infile)
        with open(outputfile, 'w') as outfile:
            json.dump(data, outfile)
        
# Pull file
get_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://example/file.json', 'savepath':'latest.json'},
    dag=process_file_dag
)

# Parse file
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable = parse_file,
    # Add the arguments
    op_kwargs = {'inputfile':'latest.json', 'outputfile':'parsed.json'},
    # Add the DAG
    dag=process_file_dag
)

# Send file via email
email_task = EmailOperator(
    task_id='email_data_eng',
    to='tk@gmail.com',
    subject='Latest update',
    html_content='Attached is the latest JSON file.',
    files='parsed.json',
    dag=process_file_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_task

## Airflow Workflow

### Sensors

A Sensor is an operator that waits for a condition such as a response from a web request, creation of a file or updating a database, to be true. Sensors are assigned to tasks. 

Sensors include, but not limited to:
- `FileSensor`
- `HttpSensor`
- `ExternalTaskSensor`
- `SqlSensor`

Some arguments used when defining sensors are:
- `mode`=`poke` to run repeatedly or `reschedule` to try later
- `poke_interval`: time interval between checks
- `timeout`

**When to use a Sensor?**

Understanding how sensors are used within Airflow provides a lot of flexibility when defining workflows.

In [None]:
from airflow.contrib.sensors.file_sensor import FileSensor

file_sensor_task = FileSensor(task_id='file_sensor',
                              filepath='data.csv',
                              poke_interval=120,
                              dag=report_dag)

init_cleanup >> file_sensor_task >> generate_report

### Executors

An Executor runs the tasks defined within the workflow. Different executors handle running the tasks differently. Some executors are:
- `SequentialExecutor`: Default executor which runs one task at a time. It is mainly suitable for development and debugging.
- `LocalExcecutor`: Runs on a single system and treats task as processes allowing concurrent runs. It is a good choice for a single production Airflow system.
- `CeleryExecutor`: Uses Celery backend to use multiple worker systems. Although powerful, it is difficult to set up.

In [31]:
!airflow config list | grep "executor = "

executor = SequentialExecutor


In [5]:
!ls ~/airflow

airflow.cfg         [34mlogs[m[m                webserver_config.py
airflow.db          unittests.cfg


In [6]:
!cat ~/airflow/airflow.cfg | grep "executor = "

executor = SequentialExecutor


When executor is `SequentialExcecutor`, `poke` mode can cause problems if the sensor is not initiated. Changing mode to `reschedule` can give Airflow a chance to run another task while waiting for the `data.csv` file. Another option to solve this problem is modify the executor type (e.g. `LocalExecutor`) that will allow parallelism greater than 1.

In [None]:
from datetime import datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor

my_dag = DAG(
    dag_id = 'execute_this',
    schedule_interval = "0 0 * * *"
)

check = FileSensor(
    task_id='check_file',
    filepath='data.csv',
    start_date=datetime(2021,1,1),
    mode='poke',  # Change this to 'reschedule'
    dag=my_dag
)

do_this = BashOperator(
    task_id='do_this',
    bash_command='do_this.sh',
    start_date=datetime(2021,1,1),
    dag=my_dag
)

precheck >> generate_report_task
