# 4. Airflow

- Talk about workflow

Introduce Airflow as a workflow manager

It uses DAGs, put an image of a DAG where each node corresponds to a task

Installing airflow would be as simple as running `pip install apache-airflow`, however, that migh cause dependency errors. Thus, in order to prevent those errors, run the following commands in your __`terminal`__ (gitbash if you are on Windows). At the time of writing, the version of Airflow is 2.1.2, if you are going to use a different version, change it in the following code:

```
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.1.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
```

Now you are storing the values of Airflow and your Python version in two variables that are going to be used in the following command:

```CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"```


Now, you will get the corresponding version of Airflow from their github repo:

```pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"```


Now, you can start using airflow. It is a good idea to initialize your database now. This database will contain metadata and it will host the DAGs you create:

`airflow db init`

And pass your credentials:

```
airflow users create \
    --username <your_username> \
    --firstname <your_firstname> \
    --lastname <your_lastname> \
    --role Admin \
    --email <your_email>
```

Let's check everything went right. In the terminal, run:

`airflow webserver --port 8080`

This will start a new server at your localhost at port 8080. It is important to notice that, even if you start the server, your scheduled DAGs won't be monitored. To do so, we need to kick off the scheduler, so open a new terminal and run:

`airflow scheduler`

If you receive a Warning message, don't worry, it won't affect your airflow current performance. Now, we are ready to start using the UI

Now, if you go to your browser and visit: `localhost:8080`, you should be able to see something like this:

![](images/Airflow.png)

The image above depicts the Airflow UI. Here, you can see the DAGs that have been created, and so far, you will only see some examples and tutorials created by the Airflow team. Let's explore it a little bit.

# Airflow UI

Inside the UI, you can explore the metadata of the DAGs, such as the name (or ID), the owner, the status of previous runs of the whole DAG or of specific tasks inside the DAG, its frequency (in the Schedule column), and when it was ran the last time.

You can see more details by clicking on the DAG. Let's observe the `example_bash_operator` DAG

<p align="center">
    <img src="images/AirFlow2.png" width="500"/>
</p>

In the DAG you can see its structure, the average time it took for running each task, the Gantt chart of the DAG to check if there are overlapping tasks, the details of the DAG, and the code that generated the DAG. We haven't ran this DAG yet, so there is no info about previous runs. We can, however, take a look at the code. Before, that, let's observe the `Graph View` tab, which will contain the same as the `Tree View` tab, but rearranged:

<p align="center">
    <img src="images/AirFlow4.png" width="500"/>
</p>



Observe that we have several Nodes, each one representing a task. You can also observe their dependencies, for example, `run_after_loop` won't start until all `runme_x` haven't finished. 

Let's run a single task to see how it works.

1. In the Airflow UI enable the `example_bash_operator` DAG. 
2. Click the DAG to see its status. You should see that there are two runs, this is because (as we will see later) these examples are set to be ran 2 days ago. If you observe the schedule, it is meant to run once every day, so two runs make sense!
3. Inside those runs, there are different status, in this case, we can see 'success' and 'skipped'. Don't worry, they are meant to be skipped.
<p align="center">
    <img src="images/AirFlow4.png" width="500"/>
</p>

4. Let's see its flow by triggering an event. First click Auto-refresh to see updates in real time, and then, click the Play button:

<p align="center">
    <img src="images/AirFlow_clip.gif" width="500"/>
</p>

Pretty cool, isn't it? We can also see the durantion of each task, and when each run took place. But I will let you explore more on that in the UI. For now, let's take a look at the code. If you click on the `Code` tab, you will see this:


In [None]:
"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_bash_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
) as dag:

    run_this_last = DummyOperator(
        task_id='run_this_last',
    )

    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id='run_after_loop',
        bash_command='echo 1',
    )
    # [END howto_operator_bash]

    run_this >> run_this_last

    for i in range(3):
        task = BashOperator(
            task_id='runme_' + str(i),
            bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        )
        task >> run_this

    # [START howto_operator_bash_template]
    also_run_this = BashOperator(
        task_id='also_run_this',
        bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    )
    # [END howto_operator_bash_template]
    also_run_this >> run_this_last

# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
    task_id='this_will_skip',
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last

if __name__ == "__main__":
    dag.cli()
```

If the tasks were supposed to print out something to the console, we can check that on the Log tab of each task. For example, look at the `also_run_this` task, it is a BashOperator object that will print out `run_id={{ run_id }} | dag_run={{ dag_run }}`. Go to the `Graph View` tab and click on the `also_run_this` task, and in the next window click `Log`. Observe the output:

<p align="center">
    <img src="images/AirFlow_log.png" width="500"/>
</p>

This was a simple walkthrough to show the Airflow UI. You saw that:

- The workflow is represented by a DAG
- Each node in the DAG corresponds to a task
- Each DAG has a schedule that sets the frequency of runs
- Tasks can be triggered by previous tasks
- Each task corresponds to an operator object
- We saw BashOperator, which execute a bash script
- We saw DummyOperator, which according to the documentation _'Operator that does literally nothing. It can be used to group tasks in a DAG.'_

We will see how to create more operators, for example, a PythonOperator, in the next section. First, let's get some practice defining a DAG with the operators we have seen so far.

# Creating Your First DAG

First of all, make sure you followed all steps so far. If that's the case, you should have a folder in your home directory named airflow. _Check it by running running the following cell. If no error is thrown, you are good to go_

In [None]:
from os.path import expanduser
import os

home = expanduser("~")
airflow_dir = os.path.join(home, 'airflow')
assert os.path.isdir(airflow_dir)

Inside that directory, you have to add a new folder named `dags`. Airflow will look into that directory to check if the DAGs you create through Python. Now, the example DAGs you are using are in your PATH directory, but new DAGs you create should be placed in `~/airflow/dags/`. _You can actually change the path where Airflow will look for new DAGs in the airflow.cfg file_

In [1]:
from airflow.models import DAG

In [2]:
from os.path import expanduser
from pathlib import Path
home = expanduser("~")
airflow_dir = os.path.join(home, 'airflow')
Path(f"{airflow_dir}/dags").mkdir(parents=True, exist_ok=True)

The Python files you create have to be stored in that folder. The file should contain the DAG with the desired arguments. The arguments can be passed in the context manager and in a dictionary.

In the context manager, simply define the tasks, don't implement any logical flow. As saw above, tasks are defined by operators

In [None]:
from airflow.models import DAG
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator

In [None]:

default_args = {
    'owner': 'Ivan',
    'depends_on_past': False,
    'email': ['ivan@theaicore.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'start_date': datetime(2020, 1, 1),
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2022, 1, 1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'trigger_rule': 'all_success'
}


In [None]:
with DAG(dag_id='test_dag',
         default_args=default_args,
         schedule_interval='*/1 * * * *',
         catchup=False,
         tags=['test']
         ) as dag:
    # Define the tasks. Here we are going to define only one bash operator
    test_task = BashOperator(
        task_id='write_date_file',
        bash_command='cd ~/Desktop && date >> ai_core.txt',
        dag=dag)
    

This example can be found in the `examples` folder, under the name `dag_test.py`. Copy the example to your `dags` folder in your airflow directory.

Once the file is in the airflow directory, you can run it by running the following command (if you haven't started the scheduler yet, run `airflow scheduler -D`):

`airflow dags unpause test_dag`

If you want these DAGs to appear in the UI, you have to add them by running the following command:

`airflow db init`

So you can manage them in the UI.

## Tasks Dependencies

You just created a task, but in a workflow, you will probablu need to add more than one, so let's add a few more. If you just specify the tasks, the tasks will be executed in sequence, with no specific order(you can change how they are executed by changing the executor in the airflow.cfg file). However, you can specify hoe they are ordered by setting the dependencies between them.

Setting dependencies is quite simple. You can specify the tasks and then 'connect' them using the bit-shift operator. For example, if you want to run the task `runme_1` after the task `runme_0`, you can do it like this:

`task_0 >> task_1` or `task_0.set_downstream(task_1)` or `task_1 << task_0` or `task_1.set_upstream(task_0)`.

You can see that there are many ways to set the dependencies, so just pick the one that works for you.

Let's say the you want to run both task `task_1` and `task_2` after task `task_0` has finished. You can do it like this:

`task_0 >> [task_1, task_2]`

Another thing you my want to do is running `task_2` only when both `task_0` and `task_1` have finished. You can do it like this:
```
task_0 >> task_2
task_1 >> task_2
```

Finally, you can also set sequencial dependencies between tasks. For example, if you want to run `task_2` after `task_1`, and `task_1` after `task_0` have finished, you can do it like this:

`task_0 >> task_1 >> task_2`

The example below shows a DAG with four tasks:

1. date_task: A BashOperator that appends the current date into a file
2. add_task: A BashOperator that stages the file created by date_task
3. commit_task: A BashOperator that commits the file staged by add_task
4. push_task: A BashOperator that pushes the committed file to a remote repository

In [None]:
from airflow.models import DAG
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'Ivan',
    'depends_on_past': False,
    'email': ['ivan@theaicore.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'start_date': datetime(2020, 1, 1), # If you set a datetime previous to the curernt date, it will try to backfill
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2022, 1, 1),
}
with DAG(dag_id='test_dag_dependencies',
         default_args=default_args,
         schedule_interval='*/1 * * * *',
         catchup=False,
         tags=['test']
         ) as dag:
    # Define the tasks. Here we are going to define only one bash operator
    date_task = BashOperator(
        task_id='write_date',
        bash_command='cd ~/Desktop/Weather_Airflow && date >> date.txt',
        dag=dag)
    add_task = BashOperator(
        task_id='add_files',
        bash_command='cd ~/Desktop/Weather_Airflow && git add .',
        dag=dag)
    commit_task = BashOperator(
        task_id='commit_files',
        bash_command='cd ~/Desktop/Weather_Airflow && git commit -m "Update date"',
        dag=dag)
    push_task = BashOperator(
        task_id='push_files',
        bash_command='cd ~/Desktop/Weather_Airflow && git push',
        dag=dag)
    
    date_task >> add_task >> commit_task
    add_task >> push_task
    commit_task >> push_task

Observe the last part of the DAG, you can see the dependencies between the tasks. Of course, you could simply set them all in tandem, but in this case, you will see how to set the dependencies in different ways.

<p align="center">
    <img src="images/AirFlow_Dependencies.png" width="500"/>
</p>

After running it, you will see that your repo is being updated every minute (which might be confusing, but this is a demo, so we don't care)

<p align="center">
    <img src="images/AirFlow_GitHub.png" width="500"/>
</p>

## Try it out

1. Create a new remote repository in your GitHub account. 
2. You will eventually use if for storing weather data, so name your repository accordingly.
3. Clone the repository to your local machine.
4. Copy the DAG file `dag_test_dependencies.py` to the folder `dags` in your local machine.
5. Change the file according to the name of your repository and the directory you cloned it to.
6. Unpause the DAG by running `airflow dags unpause dag_test_dependencies` or by going to the `DAGS` tab in the UI and clicking on the `dag_test_dependencies` DAG.

As you start creating DAGs, you might forget which one are active. Good thing is that airflow has many commands to check your works in the command line. If you type `airflow -h` you can see all comands.

In [1]:
%%bash
airflow -h

usage: airflow [-h] GROUP_OR_COMMAND ...

positional arguments:
  GROUP_OR_COMMAND

    Groups:
      celery         Celery components
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      jobs           Manage jobs
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      sync-p

Thus, you can look at the dags by running

In [1]:
%%bash
airflow dags list

dag_id                                  | filepath                                                                                                    | owner   | paused
aicore_test                             | ai_test.py                                                                                                  | airflow | True  
aicore_test2                            | ai_test2.py                                                                                                 | airflow | True  
example_bash_operator                   | /opt/miniconda3/lib/python3.9/site-packages/airflow/example_dags/example_bash_operator.py                   | airflow | False 
example_branch_datetime_operator_2      | /opt/miniconda3/lib/python3.9/site-packages/airflow/example_dags/example_branch_datetime_operator.py        | airflow | True  
example_branch_dop_operator_v3          | /opt/miniconda3/lib/python3.9/site-packages/airflow/example_dags/example_branch_python_dop_operator_3.py    | air

# Python Operators

We have seen that you can run bash commands in each task. Airflow is not limited to these commands, you can also create PythonOperators to do anything you, as long as it is contained in a Python function. Let's say that you want to create a PythonOperator that will extract information about events that took place 'On this day' in the past.

The first thing you have to do is creating a function that uses requests and bs4 to download that information from Wikipedia. 

In [2]:
from bs4 import BeautifulSoup
import requests

def get_ul(url: str):
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    return soup.find('ul')

def get_today_events(url: str):
    ul_soup = get_ul(url)
    for li in ul_soup.find_all('li'):
        if li.find('h3'):
            print(li.find('h3').text)
            print(li.find('p').text)

get_ul('https://en.wikipedia.org/wiki/Wikipedia:On_this_day/Today')

<ul><li><a href="/wiki/1576" title="1576">1576</a> – The <a href="/wiki/Cornerstone" title="Cornerstone">cornerstone</a> of Danish astronomer <a href="/wiki/Tycho_Brahe" title="Tycho Brahe">Tycho Brahe</a>'s observatory <b><a href="/wiki/Uraniborg" title="Uraniborg">Uraniborg</a></b> was laid on the island of <a href="/wiki/Ven_(Sweden)" title="Ven (Sweden)">Hven</a>.</li>
<li><a href="/wiki/1918" title="1918">1918</a> – The <b><a href="/wiki/Battle_of_Amiens_(1918)" title="Battle of Amiens (1918)">Battle of Amiens</a></b> began in <a href="/wiki/Amiens" title="Amiens">Amiens</a>, France, marking the start of the <a href="/wiki/Allies_of_World_War_I" title="Allies of World War I">Allied Powers</a>' <a href="/wiki/Hundred_Days_Offensive" title="Hundred Days Offensive">Hundred Days Offensive</a> through the German front lines that ultimately led to the end of <a href="/wiki/World_War_I" title="World War I">World War I</a>.</li>
<li><a href="/wiki/1969" title="1969">1969</a> – At a <a hre