# Apache Airflow


https://airflow.apache.org/

**Orchestrator** or **Workflow manager** allows us to create **Data Pipelines** & describe all steps of our Data Flow: from **where** to where, **what**, **when** and **how** - multiple task in any sequence (not only classical **ETL**).

**Apache Airflow** is one of the most popular open-source workflow management systems to manage data pipelines. It is a platform to programmatically author, schedule and monitor workflows as **directed acyclic graphs** (**DAGs**) of tasks. The airflow scheduler executes our tasks on an array of workers while following the specified dependencies. A **workflow** can be defined as any sequence of steps taken to accomplish a particular goal, thus they become more maintainable, versionable, testable, and collaborative.
* Airflow command line utilities make performing complex surgeries on DAGs a snap.
* Airflow user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

## Directed Acyclic Graphs (DAGs)

**DAGs** are a special subset of graphs in which the **edges** between **nodes** have a specific direction, and no **cycles** exist. When we say `no cycles exist` what we mean is the nodes cant create a path back to themselves.

<img src='imgs/DAG.png' alt='DAG' width=40%>

### Nodes
A step or task in the data pipeline process.

### Edges
The dependencies or relationships other between nodes.

<img src='imgs/dag_pipeline.png' alt='dag_pipeline' width=60%>


A DAG is a collection of Tasks that are ordered to reflect the functionality, requirements and dependencies of the workflow.


#### Are there real world cases where a data pipeline is not DAG?

It is possible to model a data pipeline that is not a DAG, meaning that it contains a cycle within the process. However, the vast majority of use cases for data pipelines can be described as a directed acyclic graph (DAG). This makes the code more understandable and maintainable.

#### Can we have two different pipelines for the same data and can we merge them back together?

Yes. It's common for a data pipeline to take the same dataset, perform two different processes to analyze the it, then merge the results of those two processes back together.

## Components of Airflow

<img src='imgs/airflow_main_components.png' alt='airflow_main_components' width=50%>


### Scheduler

**Scheduler** orchestrates the execution of jobs on a trigger or schedule interval. The Scheduler chooses how to prioritize the running and execution of tasks within the system. 

The scheduler is responsible for monitoring all DAGs and the tasks within them. When dependencies for a task are met, the scheduler triggers the task. Under the hood, the scheduler periodically inspects active tasks to trigger.

### Executor

**Executor**, also known as **Work Queue** is used by the scheduler in most Airflow installations to deliver tasks that need to be run to the Workers. Executors are the `workstation for tasks` and acts as a middle man to handle resource allocation and distribute task completion. 

There are many options available in Airflow for executors:
* **Sequential Executor** is the default executor that runs a single task at any given time and is incapable of running tasks in parallel. It is useful for a test environment or when debugging deeper Airflow bugs.


* The **LocalExecutor** (Single Node Arch) supports parallelism and hyperthreading and is a good fit for running Airflow on a local machine or a single node.


* The **CeleryExecutor** is the preferred method to run a distributed Airflow cluster. It requires Redis, RabbitMq, or another message queue system to coordinate tasks between workers.


* The **KubernetesExecutor** calls the Kubernetes API to create a temporary pod for each task instance to run. Users can pass in custom configurations for each of their tasks.


* Debug Executor


* Dask Executor


* Scaling Out with Mesos (community contributed)

### Database

The state of the DAGs and their constituent tasks needs to be saved in a **database** so that the scheduler remembers metadata information, such as the last run of a task, and the web server can retrieve this information for an end user. Airflow uses SQLAlchemy and Object Relational Mapping (ORM), written in Python, to connect to the **metadata database**. Any database supported by SQLAlchemy can be used to store all the Airflow metadata. 

Configurations, connections, credentials, user information, roles, policies, history, and even key-value pair variables are stored in the metadata database. The scheduler parses all the DAGs and stores the state of all tasks in the system. It also stores the relevant metadata, such as schedule intervals, statistics from each run, and their task instances.



### Web Interface

**Web Interface** provides a control dashboard for users and maintainers. The web interface allows users to perform tasks such as stopping and starting DAGs, retrying failed tasks, configuring credentials, The web interface visualizes the DAGs parsed by the scheduler and is built using the Flask web-development microframework. 

It displays the status of the jobs and allows the user to interact with the databases as well as read log files from a remote file store, such as S3, Google Cloud Storage, Azure blobs, etc.

### Worker

**Worker** processes execute the operations defined in each DAG. When work in the queue arrives, the worker will begin to process it.

In most Airflow installations, workers pull from the work queue when it is ready to process a task. When the worker completes the execution of the task, it will attempt to process more work from the work queue until there is no further work remaining. 

### Order of Operations for an Airflow DAG

<img src='imgs/af_working.png' alt='af_working' width=55%>


* Airflow parses all the DAGs in the background at a specific period. The default period is set using the `processor_poll_interval` config, which is, by default, equal to *one second*.


* Once a DAG file is parsed, DAG runs are created based on the scheduling parameters. Task instances are instantiated for tasks that need to be executed, and their status is set to **SCHEDULED** in the metadata database. 


> ⚠️ Since the parsing takes place periodically, any top-level code, i.e., code written in global scope in a DAG file, will execute when the scheduler parses it. This slows down the scheduler’s DAG parsing, resulting in increased usage of memory and CPU. Therefore, caution is recommended when writing code in the global scope.


* The Airflow Scheduler starts DAGs based on time or external triggers. The scheduler is responsible for querying the database, retrieving the tasks in the **SCHEDULED** state, and distributing them to the executors. 


* Once a DAG is started, the Scheduler looks at the steps within the DAG and determines which steps can run by looking at their dependencies. The Scheduler places runnable steps in the queue, and the state for the task is changed to **QUEUED**.


* The QUEUED tasks are drained from the queue by the workers and executed. Workers pick up those tasks and run them, thus the task status is changed to **RUNNING**.


* Tasks get transitioned from one state to another when DAG is run. The below diagram explains the transition:

<img src='imgs/transition.png' alt='transition' width=60%>


* When a task finishes, the worker running it marks it either **failed** or **finished**. The scheduler then updates the final status in the metadata database.


* Once all tasks have been completed, the DAG is complete.

### airflow.cfg

Airflow comes with lots of knobs and levers that can be tweaked to extract the desired performance from an Airflow cluster. For instance: 
* the `processor_poll_interval` config value can change the frequency which Airflow uses to parse all the DAGs in the background. 


* Another config value, `scheduler_heartbeat_sec`, controls how frequently the Airflow scheduler should attempt to look for new tasks to run. If set to fewer heartbeat seconds, the Airflow scheduler will check more frequently to trigger any new tasks, placing more pressure on the metadata database. 


* Yet another config, `job_heartbeat_sec`, determines the frequency with which task instances listen for external kill signals, e.g., when using the CLI or the UI to clear a task.


* Other configuration reference: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html

# Building a Data Pipeline in Airflow

A DAG.py file is created in the DAG folder in Airflow, containing the imports for operators, DAG configurations like schedule and DAG name, and defining the dependency and sequence of tasks.

<img src='imgs/operator.png' alt='operator' width=20%>


### Operators and Tasks

**Operators** define the atomic steps of work that make up a DAG. Instantiated operators are referred to as **Tasks**.

* Operators are created in the `Operator` folder in Airflow. They contain Python Classes that have logic to perform tasks. They are called in the **DAG.py** file.
 
 
* There are three main types of operators:
    * Operators that perform an action or request another system to perform an action.
    * Operators that transfer data from one system to another.
    * Operators that run until a certain condition or criteria are met, e.g., a particular file lands in HDFS or S3, a Hive partition gets created, or a specific time of the day is reached. These special kinds of operators are also known as **sensors** and can allow part of our DAG to wait on some external system. All sensor operators derive from the `BaseSensorOperator` class.


*  All operators are derived from `BaseOperator` and acquire much of their functionality through inheritance. Airflow comes with many Operators that can perform common operations:
    * **PythonOperator** - calls a python function
    * **PostgresOperator** - - executes a SQL command
    * **RedshiftToS3Operator**
    * **S3ToRedshiftOperator**
    * **BashOperator** - executes a UNIX command
    * **SimpleHttpOperator**
    * **Sensor** - waits for a certain time, file, database row, S3 key, etc.
    * **EmailOperator** - sends an email

A **task** is an instantiation of an **operator** and can be thought of as a unit of work and is represented as a **node** in a DAG. A task can be as trivial as executing a bash date command or as complex as running a remote job on a Hadoop cluster.

A **task instance** represents an actual run of a task. Task instances belong to DAG runs, have an associated execution_date, and are instantiable, runnable entities. Task instances go through various states, such as `running`, `success`, `failed`, `skipped`, `retry`, etc. Each task instance (and task) has a life cycle through which it moves from one state to another.

### Schedules 

A DAG, when executed, is called a **DAG run**. If we have a DAG that is scheduled to run every hour, then each instantiation of the DAG constitutes a DAG run. There could be multiple DAG runs associated with a DAG running at the same time.


* `schedule_interval` tells when to run this dag. **Schedules** are optional, and may be defined with **cron strings** or **Airflow Presets**. Airflow provides the following presets:
    * @once - Run a DAG once and then never again
    * @hourly - Run the DAG every hour
    * @daily - Run the DAG every day
    * @weekly - Run the DAG every week
    * @monthly - Run the DAG every month
    * @yearly- Run the DAG every year
    * None - Only run the DAG when the user initiates it

### Parameters/Arguments

* While creating a DAG, give it a **name**, a **description**, a **start date**, and an **interval**.
    
    
* `start_date`: If start date is in the past, Airflow will run DAG as many times as there are schedule intervals between that start date and the current date.


* `end_date`: Unless we specify an optional end date, Airflow will continue to run our DAGs until we disable or delete the DAG.


* `max_active_run` tells how many instances of the Dag can run concurrently.


* `retries` argument re-run a failed task multiple times before aborting the workflow run.


* `on_success_callback` and `on_failure_callback` arguments are used to trigger some actions once the workflow succeeds or fails respectively. This will be useful to send personalized alerts to internal team via Slack, Email, or any other API call when a workflow task succeeds or fails.

### First DAG (Basic)

```python
from datetime import timedelta
import logging

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# default list of arguments that can be passed to any of the tasks
default_args = {
    'owner': 'dagjoins',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['dagjoins@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    
    # Parameters that we have commented out but can be specified if desired.
    # 'max_active_runs': 3
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

DEMO_FILE = '~/demo.txt'

# Creating a DAG instance
dag = DAG(
        dag_id="Example_1",
        description="Example DAG 1",
        default_args=default_args,
        schedule_interval="@once",
      # start_date=datetime.now(),
)


# create task to create an empty file with the bash command touch. 
create_file = BashOperator(
    task_id='create_file',
    bash_command=f'touch {DEMO_FILE}',
    dag=dag,)


# create task to write to file again using the bash operator.
write_to_file = BashOperator(
    task_id='write_to_file',
    bash_command=f'echo "Hello World !" > {DEMO_FILE}',
    dag=dag,)


# create task to read the file using the cat bash command.
read_from_file = BashOperator(
    task_id='read_from_file',
    bash_command=f'cat {DEMO_FILE}',
    dag=dag,)


def message():
    logging.info("Airflow is awesome!")
    logging.info("Airflow uses DAGs...")
    logging.info(f"This DAG has created a file: {DEMO_FILE}")

message_task = PythonOperator(
    task_id="message_task",
    python_callable=message,
    dag=dag,)


# Specify the order fo the tasks.
create_file >> write_to_file >> read_from_file >> message_task

```

### Task Dependencies

**Task Dependencies** can be described programmatically in Airflow using `>>` and `<<`
* `a >> b` means **a comes before b**
* `a << b` means **a comes after b**

Tasks dependencies can also be set with `set_downstream` and `set_upstream`.
* `a.set_downstream(b)` means **a comes before b**
* `a.set_upstream(b)` means **a comes after b**

Parallel tasks are included in `[]`. For example, `[a,b] >> c`


### Variables

* **Variables** are defined as a generic way to store and retrieve arbitrary content within Airflow. They are represented as a simple key value stored into the meta database of Airflow.

<img src='imgs/af_variable.png' alt='af_variable' width=40%>

*  Variables are useful for storing and retrieving data at runtime, in avoiding hard-coding values, and from code repetitions within our DAGs.


* Airflow goes through **2-layers** before reaching the metastore. If the variable is found in one of these two layers, Airflow doesn’t need to create a connection, thus it is better optimized.


*  Command Line Interface and Environment Variables Reference: https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#variables

### XComs

* **XComs** or **Cross Communication messages** is designed to communicate small amount of data between tasks. Basically, they let tasks exchange messages within our DAG. For example, if task_B depends on task_A, task_A can push data into a XCom and task_B can pull this data to use it. 


* Cross-communication utilizes Python’s *pickling* functionality. Python’s `pickle` module is used to **serialize** a Python object’s structure into a character stream and to **deserialize** it back. Objects that can’t be pickled can’t be shared using xcom. The object to be shared is pickled and stored in the database with an associated execution date, task instance, and DAG run by the sending task and then retrieved from the database by the intended recipient task.


* The `xcom_push()` and `xcom_pull()` methods can be used to send and retrieve objects, respectively.


* These methods `xcom_push` and `xcom_pull` are only **accessible from a task instance object**. With the PythonOperator, we can access them by passing the parameter `ti` to the python callable function. 


* We can pull XComs from multiple tasks at once.


* In the BashOperator, `do_xcom_push` allows us to push the **last line** written to stdout into a XCom. By default, `do_xcom_push` is set to **True**.


*  By default, when a XCom is automatically created by returning a value, Airflow assigns the key `return_value`. The key `return_value` indicates that this XCom has been created by returning the value from the operator. The XCom values gets stored into the `metadata database` of Airflow with the key `return_value`.


* XComs create **implicit dependencies** between the tasks that are not visible from the UI.

### Macros

* **Jinja templates** and **Macros** in Apache Airflow are the way to pass dynamic data to our DAGs at runtime. 


* Templating allows us to interpolate values at run time in static files such as HTML or SQL files, by placing special placeholders in them indicating where the values should be and/or how they should be displayed.


* The curly brackets `{{ }}` represent **placeholders** in which the value is replaced at the **runtime** each time the DAG gets triggered. For example, `{{ ds }}` is a macro that gets replaced by the **execution date** of the DAG.


* **Macros** are functions that take an input, modify that input and give the modified output. Macros can be used in our templates by calling them with the following notation: `macro.macro_func()`.


* Apache Airflow brings predefined variables that we can use in our templates. They are very useful since they allow us to have information about the current executing DAG and task. Macros Reference: https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html


*  We can also check our Jinja Templates rendered before even executing our DAG in the Rendered section of Airflow UI.

### Plugins

Airflow was built with the intention of allowing its users to extend and customize its functionality through plugins. **Plugins** are used as an easy way to write, share and activate custom behavior of runtime.

Airflow has a simple built-in plugin manager that can integrate external features to its core by simply dropping files in your `$AIRFLOW_HOME/plugins` folder. The Python modules in the plugins folder get imported and integrated into Airflow’s main collections and become available for use to developers.

The most common types of user-created plugins for Airflow are **Operators** and **Hooks**. These plugins make DAGs reusable and simpler to maintain. We can also integrate sensors, macros, executors, and web views by dropping files in the plugin directory.

### Creating a custom Operator

An **Operator** is an atomic block of workflow logic, which performs a single action. All operators are derived from the `BaseOperator` base class.
* The `__init__` function can be used to **configure settings** for the task.
* The primary logic of our operator is captured in the `execute()`, which must be defined in an operator. This method is called when the task instance is executed.
* The execute method must be **idempotent** since it can retry many times until the maximum number of retries is reached. 
* Any value that the execute method returns is saved as an **Xcom message** under the key `return_value`.
* The execute method may also raise the `AirflowSkipException` from **airflow.exceptions**. In such a case the task instance would transition to the `Skipped` status.


To create custom operator, follow the steps:
1.	Identify Operators that perform similar functions and can be consolidated
2.	Define a new Operator in the `plugins` folder
3.	Replace the original Operators with the new custom operator
4.  Re-parameterize and instantiate them.

### Sensors

**Sensors** can be described as special operators that are used to **monitor(poll) a long-running task**, file, database row, S3 key, another DAG/task, etc.

To create a Sensor, we define a subclass of `BaseSensorOperator` and override its **poke function**. The poke function will be called over and over every `poke_interval` seconds until one of the following happens:

* poke returns **True** – if it returns False it will be called again.
* poke raises an `AirflowSkipException` from **airflow.exceptions** – the Sensor task instance’s status will be set to `Skipped`.
* poke raises another exception, in which case it will be retried until the maximum number of retries is reached.

### Hooks and Connections

**Connections** can be accessed in code via **hooks**. Hooks provide a *reusable interface to external systems and databases*. With hooks, we don’t have to worry about how and where to store these connection strings and secrets in our code. Airflow comes with many Hooks that can integrate with common systems. Here are a few common ones:
* HttpHook
* PostgresHook (works with RedShift)
* MySqlHook
* SlackHook
* PrestoHook

# Readings

* https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-1/v-5/
