Modern Big Data Tool (Apache Airflow)

# Apache Airflow

Airflow workflow management platform, support schedule and monitor workflows.

Data workflow problem such as sequential task and schedule task, we have to deal with
- Dependent task
- Schedule job
- Manage and monitor
- Error handling e.g. send email when it fails

<img src="./asset/dag-example.png" title='https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a' style="width: 700px;"/>

There are a number of ways to do data workflows:
(you can check the detailed comparison on the link in bottom of the report)
- Custom scripts
- Cron, based on timing and run-time of each task
- Oozie, Luigi
- **Airflow**

## Features

- Monitoring
- Scheduler, fire task that ready to run
- Rich CLI (command line interface) e.g. to test, backfill, describe task
- Easier test e.g. `airflow test <dagId> <taskId>`
- Easy-to-use UI
    - Explore your DAGs definition
    - Task status, summary, dependencies, progress
    - Metadata
    - Logs
    - Task stats e.g. tasks latencies over time
    - Enable / Disable task via UI
    - etc.
- Task hierarchy / Dependency management
- Task logs
- Re-run historical tasks and task action such as "mark" and "force"
- Historic pipeline runs / Backfill concept
- Strong open source e.g. The gitter (chatroom) for airflow is very active
- Many extensions
- Many support many of big data ecosystem tools e.g. S3, Druid, HDFS, Hive, Sqoop, etc., that was a great benefit if you want to working on a big data ecosystem.
- Scalable, design for distributed task queue
- etc.

## Use cases of Airflow at Airbnb:

1. Data warehousing: ETL, cleanse, organize, data quality check and publish data into data warehouse
2. Growth analytics: compute metrics around e.g. guest and host engagement
3. Experimentation: compute our A/B testing experimentation
4. Email targeting
5. Sessionization: compute clickstream and time spent datasets
6. Search: compute search ranking related metrics
7. Data infrastructure maintenance: database scrapes, folder cleanup

<img src="./asset/example-ui-graph.png" style="width: 800px;" title="https://medium.com/airbnb-engineering/airflow-a-workflow-management-platform-46318b977fd8"/>
<img src="./asset/example-ui-tree.png" style="width: 800px;" title="https://medium.com/airbnb-engineering/airflow-a-workflow-management-platform-46318b977fd8"/>

# Setup & installation 

## Virtual machine (VM)

Actually, it's required only `Python` and `pip` installation. So, the system can be any as long as we can install `Python` and `pip` but from our previous exploration, we recommend to use linux based such as CentOS.

In this project, we're going to use
- CentOS 6
- Python 2
- MongoDB server, that's already been installed

MongoDB server will being used for the example usage
(it's not main focus on this project)

### Create Firewall rules
<img src="./asset/firewall-port.png" style="width: 800px;"/>

### Create VM instance

Create new VM based on "centos-basic-image" image.

In case you don't have "centos-basic-image" image, you basically just need to install `Python 2` and `pip`

<img src="./asset/create-vm.png" style="width: 600px;"/>

### Install MongoDB (already been taught in class)

### Access VM instance that we created

Such as `ssh admin@airflow-project`

## Add environment variables

Add these environment variables into `~/.bash_profile` file, `AIRFLOW_HOME` will store its configuration and our SQlite database (if we using default database).

```
export SLUGIFY_USES_TEXT_UNIDECODE=yes
export AIRFLOW_HOME=~/airflow
```

Then reload config by `source .bash_profile`

![](./asset/env-var.png)

## Install module

```
sudo yum install gcc
sudo yum install gcc-c++
sudo pip2 install --upgrade pip

sudo SLUGIFY_USES_TEXT_UNIDECODE=yes pip2 install apache-airflow
sudo pip2 install flask_bcrypt ccxt pymongo # for example code

# it will create `/home/admin/airflow`
# also initialize database for Airflow
airflow initdb
# to check current version of it
airflow version
```

## Setup authentication for Airflow web UI

### Update authenticate setting

```
nano /home/admin/airflow/airflow.cfg
find
    authenticate = False
then change to
    authenticate = True
    auth_backend = airflow.contrib.auth.backends.password_auth
```

### Create new user

Default database is a `SQLite` database, which is fine for this project. In a production, you'll probably have to using something else like MySQL or PostgreSQL.

These Python code below will create new user into SQLite db

```
python2

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin2'
user.email = 'admin@example.com'
user.password = 'qwejqwhisrw948'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
```

<img src="./asset/new-user.png" style="width: 600px;"/>

## Upload example files

Upload `dags` folder into `/home/admin/airflow`, contains example DAG files that we prepare for demo purpose (you can setup and change its location in `airflow.cfg`).

At the end you'll see structure like this
```
airflow                     # root directory.
├─ dags                     # contains all DAG files
| |...
| ├── my_tutorial.py        # DAG definitions
| ├── my_simple_dag.py
| └── my_cryptocurrency.py
├─ logs                     # logs for the various tasks that are run
├─ airflow-webserver.pid
├─ airflow.cfg              # global configuration for Airflow
├─ airflow.db               # SQLite database used by Airflow internally to track status of each DAG.
└─ unittests.cfg
```

## Start

Open 2 terminals
1. First terminal run scheduler: `$ airflow scheduler`
2. Second terminal run web UI: `$ airflow webserver --port 8080`

Now, you should be able to access Airflow web UI, then login with user we created
- user: `admin2`
- pass: `qwejqwhisrw948`

![](./asset/login-page.png)

# Concept and components

## Airflow scheduler

Monitoring process that runs all the time, triggers task execution based on `schedule_interval` and `start_date`.

For example

```
start_date: 2018-9-24
schedule_interval: */1 * * * *
```

So, task will be executed at 
- 2018-9-24 00:01:00
- 2018-9-24 00:02:00
- 2018-9-24 00:03:00 and so on


## DAG / Workflows

An Airflow workflow is designed as a directed acyclic graph (DAG).

For example in "home-price-scrapper" process you may want some thing like this

```
get_data_from_goodhome.com --> cleaning -┐
get_data_from_home.com     --> cleaning  |-> merged -> save into db
get_data_from_homedd.com   --> cleaning -┘
```

More examples

![](./asset/dag.png)
![](./asset/dag2.png)

### Dependency

Dependency's defined by code

#### No dependency, it will be executed parallely.

<img src="./asset/dependency-none.png" style="width: 200px;"/>

#### Sequential

Such as `opr_hello >> opr_greet >> opr_sleep >> opr_respond`
<img src="./asset/dependency1.png" style="width: 800px;"/>

#### Others

Such as `opr_hello >> opr_greet >> opr_sleep << opr_respond`
<img src="./asset/dependency2.png" style="width: 800px;"/>

## DagRun (DAG runs)

A DagRun is the instance of a DAG. Airflow will generate DAG runs from the `start_date` with the specified `schedule_interval` that we define in DAG file.

Once a DAG is active, Airflow continuously checks in the database if all the DAG runs have successfully ran since the `start_date`. Any missing DAG runs are automatically scheduled.

e.g. DagRun for `my_simple_dag`, each task run parallelly

```
opr_hello = BashOperator(task_id='say_Hi', bash_command='echo "Hi!!"')
opr_greet = PythonOperator(task_id='greet', python_callable=greet)
opr_sleep = BashOperator(task_id='sleep_me', bash_command='sleep 5')
opr_respond = PythonOperator(task_id='respond', python_callable=respond)

+--------------------------------+
|             DagRun             |
|                                |
| +------------+  +------------+ |
| | opr_hello  |  | opr_greet  | |
| +------------+  +------------+ |
| +------------+  +------------+ |
| | opr_sleep  |  | opr_respond| |
| +------------+  +------------+ |
+--------------------------------+
```

## Backfilling

For example, we run web scrapper hourly to get data by using current date to get specific data such as `http://getdata/?date=<current-datetime>` then the script broken last week. What we may have to do is run it manually

Airflow have ability to solve that issue by using `start_date` and `schedule_interval` including context API provided by Airflow For example, when you initialize on `2016-01-04` a DAG with a `start_date` at `2016-01-01`
and a daily `schedule_interval`, Airflow will schedule DAG runs for all the days between 2016-01-01 and 2016-01-04.

## Operator

Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated.

There are 3 main types of operators:
- Operators that performs an action, or tell another system to perform an action e.g. `BashOperator`, `PythonOperator`, `SimpleHttpOperator`, `MySqlOperator`, etc
- Transfer operators move data from one system to another e.g. `HiveToMySqlTransfer`, `MsSqlToHiveTransfer`, etc.
- Sensors are a certain type of operator that will keep running until a certain criterion is met.

# Database

Metadata structure that was stored in Airflow database (e.g. `airflow.db`).

<img src="./asset/airflowdb-dag.png" style="width: 800px;"/>
<img src="./asset/airflowdb-dag-run.png" style="width: 800px;"/>
<img src="./asset/airflowdb-dag-stats.png" style="width: 800px;"/>
<img src="./asset/airflowdb-job.png" style="width: 800px;"/>
<img src="./asset/airflowdb-log.png" style="width: 800px;"/>

# Web UI

## Example

<img src="./asset/ui-dashboard-example.png" style="width: 900px;"/>
<img src="./asset/ui-log-example.png" style="width: 900px;"/>
<img src="./asset/ui-task-example.png" style="width: 900px;"/>