Skip to content

saurabh2mishra/airflow-notes

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

50 Commits
Β 
Β 
Β 
Β 

Repository files navigation

This blog is a collection of my notes on Airflow. I thought to write it to consolidate all of my learning in one place with a good hands-on project(there are several excellent materials on the web). I have tried to put all concepts/jargon lucidly to make our understanding of the Airflow clear and to the point. However, before you jump and dig into it, there are two prerequisites.

πŸ”΄ Python

πŸ”΄ Extract-Transform-Load aka ETL

And the reason for those prerequisites is apparent - Either we build a report or machine learning project, ETL is a must for both, and since Airflow is written in Python, we cannot avoid it.

These notes would be enhanced in future for other features/topics. For the existing content, if you feel something is inaccurate or misleading then please feel free to contribute or highlight.

Contents


Introduction

Airflow is a batch-oriented framework for creating data pipelines.

It uses DAG to create data processing networks or pipelines.

  • DAG stands for -> Direct Acyclic Graph. It flows in one direction. You can't come back to the same point, i.e. acyclic.

  • In many data processing environments, a series of computations are run on the data to prepare it for one or more ultimate destinations. This type of data processing flow is often referred to as a data pipeline. A DAG or data processing flow can have multiple paths, also called branching.

The simplest DAG could be like this.

flowchart LR
A[task-read-data-from-some-endpoint] --> |dependency| B[task-write-to-storage]

where

  • read-data-from-some-endpoint & write-to-storage - represent a task (unit of work)
  • Arrow --> represents processing direction and dependencies to check on what basis the next action will be triggered.

Ok, so why should we use Airflow?

  • If you like Everything As Code and everything means everything, including your configurations . This helps to create any complex level pipeline to solve the problem.
  • If you like open source because almost everything you can get as an inbuilt operator or executors.
  • Backfilling features. It enables you to reprocess historical data.

And, why shouldn't you use Airflow?

  • If you want to build a streaming data pipeline.

Airflow Architecture

So, we have at least an idea that Airflow is created to build the data pipelines. Below we can see the different components of Airflow and their internal connections.

Airflow Architecture

We can see the above components when we install Airflow, and implicitly Airflow installs them to facilitate the execution of pipelines. These components are

  • DAG directory, to keep all dag in place to be read by scheduler and executor.
  • Scheduler parses DAGS, checks their schedule interval and starts scheduling DAGs tasks for execution by passing them to airflow workers.
  • Workers, responsible for doing actual work. It picks up tasks and executes them.
  • Web server presents a handy user interface to inspect, trigger, and debug the behaviour of DAGs and tasks.
  • Metadata Database, used by the scheduler, executor, and webserver to store state so that all of them can communicate and take decisions. - follow this link to see, how to set and get metadata variables. Accessing Metadata Database

For now, this is enough architecture. Let's move to the next part.


Installing Airflow

Airflow provides many options for installations. You can read all the options in the official airflow documentation, and then decide which option suits your need. However, to keep it simple and experimental, I will go ahead with the Docker way of installation.

Installing Airflow with Docker is simple and intuitive which helps us to understand the typical features and working of Airflow. Below are the pre-requisites for running Airflow in Docker.

  • Docker Community Edition is installed in your machine. Check this link for Windows and Mac. I followed this blog for docker installation on Mac
  • Docker Compose installation.

Caveats - You need at least 4GB of memory for the Docker engine.

Docker memory setting

Refer to this link if you are trying to install Airflow in windows (I haven't tried it myself)- link1 and link2

Installation Steps (on Mac)

  1. Create a file name as airflow_runner.sh. Copy the below commands in the script.
docker run --rm "debian:buster-slim" bash -c 'numfmt --to iec $(echo $(($(getconf _PHYS_PAGES) * $(getconf PAGE_SIZE))))'

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'

mkdir -p ./dags ./logs ./plugins

echo -e "AIRFLOW_UID=$(id -u)" > .env
  1. Provide execute access to file. chmod +x airflow_runner.sh
  2. Run source airflow_runner.sh
  3. Once the above steps are completed successfully, run docker-compose up airflow-init to initialise the database.

After initialisation is complete, you should see a message like the below.

airflow-init_1       | Upgrades done
airflow-init_1       | Admin user airflow created
airflow-init_1       | 2.3.0
start_airflow-init_1 exited with code 0

Now, we are ready to go for the next step.


Starting Docker Airflow project

πŸ‘‰ docker-compose up --build

The above command starts a docker environment and runs below services as well

  • Webserver
  • Scheduler
  • Postgres database for metastore.

After a few seconds, when everything is up, the webserver is available at http://localhost:8080. The default account has the login airflow and the password airflow.

From the terminal, you can also run docker ps to check the processes which are up and running.

Cleaning up

To stop and delete containers, delete volumes with database data and download images, run:

πŸ‘‰ docker-compose down --volumes --rmi all


Fundamentals of Airflow

We have installed Airflow and know at a high level what it stands for, but we have yet to discover how to build our pipeline.

We will roughly touch on a few more concepts and create a full-fledged project using these concepts.

So, let's refresh our memory one more time. Airflow works on DAG principle, and DAG is an acyclic graph. We saw this example read-data-from-some-endpoint --> write-to-storage

So, create an airflow DAG; we will write it like this

  • Step 1

Create a DAG. It accepts a unique name, when to start it, and what could be the interval of running. There are many more parameters that it agrees to, but let's stick with these three.

dag = DAG(                                                     
   dag_id="my_first_dag",                          
   start_date=airflow.utils.dates.days_ago(2),                
   schedule_interval=None,                                     
)
  • Step 2

And now, we need to create our two functions (I'm creating dummy functions) and will attach them to the Operator.

def read_data_from_some_endpoint():
    pass

def write_to_storage():
    pass
 
  • Step 3

Let's create our operators. We have python functions which need to be attached to some Operators. The last argument is that it accepts a DAG. Here we need to tell the operator which dag it will consider.

download_data = PythonOperator(. # This is our Airflow Operator.
    task_id="download_data", # unique name; it could be any name 
    python_callable=read_data_from_some_endpoint, # python function/callable
    dag = dag # Here we will attach our operator with the dag which we created at 1st step.
) 

persist_to_storage = PythonOperator(
    task_id = "persist_to_storage",  
    python_callable = write_to_storage,
    dag = dag
) 
  • Step 4

Now, Lets create the execution order of our operators

download_data >> persist_to_storage  # >> is bit shift operator in python which is overwritten in Airflow to indicate direction of task flow.

That's it. We have successfully created our first DAG.

Define your Task and DAG

Airflow provides three ways to define your DAG

  1. Classical
  2. with context manager
  3. Decorators

It doesn't matter which way you define your workflow but sticking to only one helps to debug and review the codebase. Mixing different definitions can perplex the code (though it is a personal choice)

# Classical 

import pendulum
from airflow import DAG
from airflow.operators.dummy import DummyOperator

dag = DAG("classical_dag", start_date=pendulum.datetime(2022, 5, 15, tz="UTC"),
             schedule_interval="@daily", catchup=False)

op = DummyOperator(task_id="a-dummy-task", dag=dag)
# with context manager 

with DAG(
    "context_manager_dag", start_date=pendulum.datetime(2022, 5, 15, tz="UTC"),
    schedule_interval="@daily", catchup=False
) as dag:
    op = DummyOperator(task_id="a-dummy-task")
# Decorators 

@dag(start_date=pendulum.datetime(2022, 5, 15, tz="UTC"),
     schedule_interval="@daily", catchup=False)
def generate_decorator_dag():
    op = DummyOperator(task_id="a-dummy-task")

dag = generate_decorator_dag()

How to create a bit complex tasks flow?

Let's take this example.

example_dag

We see two color codes have been used in the above image.

light red - shows branch flow (two or more flows) i.e. branch_1, branch_2

light green - normal task for different purpose. i.e. false_1, false_2, true_2 etc.

Now, without worrying about code, let's create the task flow to represent the above structure.

1- Lower workflow from branch_1

branch_1 >> true_1 >> join_1

2- Upper workflow from branch_1

  • upper flow has two sections. The first part goes till branch_2

branch_1 >> false_1 >> branch_2

  • and then at branch_2, two parallel execution happens and goes till false_3

branch_2 >> false_2 >> join_2 >> false_3

branch_2 >> true_2 >> join_2 >> false_3

Since false_2 and true_2 is happening in parallel, so we can merge them (put them in a list) in this way

branch_2 >> [true_2, false_2] >> join_2 >> false_3

and finally, we can merge the above steps like this

branch_1 >> false_1 >> branch_2 >> [true_2, false_2] >> join_2 >> false_3 >> join_1

So we have got these two from step 1 and step 2

branch_1 >> true_1 >> join_1

branch_1 >> false_1 >> branch_2 >> [true_2, false_2] >> join_2 >> false_3 >> join_1

and that's represent the execution of task or a DAG.

How bit shift operator (>> or <<) defines task dependency?

The __ rshift __ and __ lshift __ methods of the BaseOperator class implements the Python bit shift logical operator in the context of setting a task or a DAG downstream of another. See the implementation here.

So, bit shift has been used as syntactic sugar for set_upstream (<<) and set_downstream (>>) tasks.

For example task1 >> task2 is same as task2 << task1 is same as task1.set_downstream(task2) is same as task1.set_upstream(task2)

This operator plays important role to build relationships among the tasks.

Effective Task Design

The created task should follow

1- Atomicity

Means either all occur, or nothing happens. So each task should do only one activity and, if not the case, split the functionality into individual tasks.

2- Idempotency

An Airflow task is said to be idempotent if calling the same task multiple times with the same inputs has no additional effect. This means that rerunning a task without changing the inputs should not change the overall output.

for data load: It can be made idempotent by checking for existing results or ensuring that the task overwrites previous results.

for database load: upsert can be used to overwrite or update previous work done on the tables.

3- Back Filling the previous task

The property helps to process the historical data. The DAG class can be initiated with property catchup

if catchup=False -> Airflow starts processing from the current interval.

If catchup=True -> This is the default property. Airflow starts processing from the past interval.

Runtime Variables

All operators load context a pre-loaded variable to supply the most used variables during the DAG run. Python examples can be shown here

from urllib import request
 
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
 
dag = DAG(
    dag_id="showconext",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
)
 
def _show_context(**context):
    """
    context here contains these preloaded items 
    to pass in dag during runtime.

    Airflow’s context dictionary can be found in the
    get_template_context method, in Airflow’s models.py.
    
    {
    'dag': task.dag,
    'ds': ds,
    'ds_nodash': ds_nodash,
    'ts': ts,
    'ts_nodash': ts_nodash,
    'yesterday_ds': yesterday_ds,
    'yesterday_ds_nodash': yesterday_ds_nodash,
    'tomorrow_ds': tomorrow_ds,
    'tomorrow_ds_nodash': tomorrow_ds_nodash,
    'END_DATE': ds,
    'end_date': ds,
    'dag_run': dag_run,
    'run_id': run_id,
    'execution_date': self.execution_date,
    'prev_execution_date': prev_execution_date,
    'next_execution_date': next_execution_date,
    'latest_date': ds,
    'macros': macros,
    'params': params,
    'tables': tables,
    'task': task,
    'task_instance': self,
    'ti': self,
    'task_instance_key_str': ti_key_str,
    'conf': configuration,
    'test_mode': self.test_mode,
    }
    """
   start = context["execution_date"]        
   end = context["next_execution_date"]
   print(f"Start: {start}, end: {end}")
 
 
show_context = PythonOperator(
   task_id="show_context", 
   python_callable=_show_context, 
   dag=dag
)

The above variables are pre-loaded under context and can be used anywhere in the operator. Dynamic reference happens in the Jinja templating way.

e.g. {{ds}}, {{next_ds}}, {{dag_run}}

Templating fields and scripts

Two attributes in the BaseOperator define what can we put in for templating.

template_fields: Holds the list of variables which are templateable

template_ext: Contains a list of file extensions that can be read and templated at runtime

See this example for those two fields' declaration

class BashOperator(BaseOperator):
    template_fields = ('bash_command', 'env') # defines which fields are templateable
    template_ext = ('.sh', '.bash')  # defines which file extensions are templateable

    def __init__(
        self,
        *,
        bash_command,
        env: None,
        output_encoding: 'utf-8',
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.bash_command = bash_command  # templateable (can also give path to .sh or .bash script)
        self.env = env  # templateable
        self.output_encoding = output_encoding  # not templateable

Example of DAG which uses Airflow context for templating

Let's take an example to showcase the power of templating

from datetime import datetime

BashOperator(
    task_id="print_now",
    bash_command="echo It is currently {{ macros.datetime.now() }}", 
)

πŸ‘‰ Note

If you see here, we used macro to call datetime.now(). If we don't use the macro, it will raise jinja2.exceptions.UndefinedError: 'datetime' is undefined exception.

Check here for the macro list.

But now you might be thinking about where we got PythonOperator, DAG etc. We will see Airflow's critical modules to understand it.

πŸ‘‰ This topic is covered in detail in this blog.


Airflow Module Structure

Airflow has a standard module structure. It has all its important packages under airflow. Few of the essential module structures are here

  • airflow - For DAG and other base API.
  • airflow.executors : For all inbuilt executors.
  • airflow.operators : For all inbuilt operators.
  • airflow.models : For DAG, log error, pool, xcom (cross-communication) etc.
  • airflow.sensors : Different sensors (in simple words, it is either time interval or file watcher to meet some criteria for task executions)
  • airflow.hooks : Provides different modules to connect external API services or databases.

So, by looking at the above module, we can quickly determine that to get PythonOperator or any other Operator, we need to import them from airflow.operators. Similarly, an executor can be imported from airflow.executors and so on.

Apart from that, many different packages providers, including vendors and third-party enhance the capability of Airflow. All providers follow apache-airflow-providers nomenclatures for the package build. Providers can contain operators, hooks, sensors, and transfer operators to communicate with many external systems, but they can also extend Airflow core with new capabilities.

This is the list of providers - providers list


Workloads

Operators

Operators help run your function or any executable program.

Type of Operators

Primarily there are three types of Operators.

(i) Operators

Helps to trigger certain action. Few of them are

  • PythonOperator - To wrap a python callables/functions inside it.
  • BashOperator - To call your bash script or command. Within BashOperator we can also call any executable program.
  • DummyOperator - to show a dummy task
  • DockerOperator - To write and execute docker images.
  • EmailOperator - To send an email (using SMTP configuration)

and there many more operators do exits. See the full operators list in the official documentation.

(ii) Sensors

A particular type of operator whose purpose is to wait for an event to start the execution. For instance,

  • ExternalTaskSensor waits on another task (in a different DAG) to complete execution.
  • S3KeySensor S3 Key sensors are used to wait for a specific file or directory to be available on an S3 bucket.
  • NamedHivePartitionSensor - Waits for a set of partitions to appear in Hive.

(iii) Transfers

Moves data from one location to another. e.g.

  • MySqlToHiveTransfer Moves data from MySql to Hive.
  • S3ToRedshiftTransfer load files from s3 to Redshift.

Scheduler

The scheduler is the crucial component of Airflow, as most of the magic happens here. It determines when and how your pipelines are executed. At a high level, the scheduler runs through the following steps.

  1. Once users have written their workflows as DAGs, the scheduler reads the files containing these DAGs to extract the corresponding tasks, dependencies, and schedule intervals of each DAG.

  2. For each DAG, the scheduler then checks whether the scheduled interval for the DAG has passed since the last time, it was read. If so, the tasks in the DAG are scheduled for execution.

  3. For each scheduled task, the scheduler then checks whether the task's dependencies (= upstream tasks) have been completed. If so, the task is added to the execution queue.

  4. The scheduler waits several moments before starting a new loop by jumping back to step 1.

πŸ‘‰ To start a scheduler, just run the airflow scheduler command.

In Airflow, while defining the DAG, we provide a few options to let the scheduler know when jobs are required to be triggered.

start_date -> when to start the DAG.

end_date -> whne to stop the DAG

schedule_interval -> Time interval for the subsequent run. hourly, daily, minutes etc

depends_on_past -> Boolean to decide from when DAG will execute.

retry_delay -> time duration for next retry. It accepts datetime object. e.g. for 2 mins we will write timedelta(minutes=2)

Airflow scheduler works on the principle of Cron based job execution. Below is the cron presentation.

β”Œβ”€β”€β”€β”€β”€β”€β”€ minute (0 - 59)
β”‚ β”Œβ”€β”€β”€β”€β”€β”€ hour (0 - 23)
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€ day of the month (1 - 31)
β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€ month (1 - 12)
β”‚ β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€ day of the week (0 - 6) (Sunday to Saturday;
β”‚ β”‚ β”‚ β”‚ β”‚      7 is also Sunday on some systems)
* * * * *

every 5th minute -> */5 * * * *

every hour at minute 30 e.g. at 10:30, 11:30, 12:30 and so on. -> 0,5,10 * * * *

Sometimes if you haven't worked before on Unix based cron job scheduler, then it is tough to know how to write them exactly (it's also tricky for experienced developers as well).

Check this website to generate cron expression - https://www.freeformatter.com/cron-expression-generator-quartz.html

Executors

It helps to run the task instance (task instances are functions which we have wrapped under operator)

Types of Executors

There are two types of executors

Local Executors

  • Debug Executor- The DebugExecutor is a debug tool and can be used from IDE. It is a single process executor that queues tasks and executes them.
  • Sequential Executor - Default executor and runs within scheduler. Apart from this, it executes one task instance at a time, which eventually makes it not a good candidate for production.
  • Local Executor - Run within scheduler and execute multiple tasks instance at a time. Again not a good candidate for production as it doesn't scale.

Remote Executors

  • Celery Executor - Run tasks on dedicated machines(workers). It uses distributed task queue to distribute loads to different workers to parallelise work. It horizontally scales, making it fault-tolerant and a good candidate for production.
  • Kubernetes Executor - Run tasks in dedicated POD(worker), and Kubernetes APIs get used to managing the POD. It scales efficiently and is a perfect candidate for production.
  • CeleryKubernetes Executor - It allows users to run a CeleryExecutor and a KubernetesExecutor simultaneously. An executor is chosen to run a task based on the task's queue. Choice of this executor is only needed in a few cases.
  • Dask Executor -Dask clusters can be run on a single machine or remote networks.

Hooks

A high-level interface to establish a connection with databases or other external services.

List of different available hooks

What if something I'm interested in is not present in any of the modules?

You didn't find the right operator, executors, sensors or hooks? No worries, you can write your custom stuff. Airflow provides Base classes which we can inherit to write our custom classes.

from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.base_hook import BaseHook
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):
    
    @apply_defaults # for default parameters from DAG
    def __init__(**kwargs):
        super(MyCustomOperator).__init__(**kwargs)
        pass
    def execute(self, conext): # we will cover more about context in next part.
        #your logic
        pass


class MyCustomSensor(BaseSensorOperator):
    
    @apply_defaults # for default parameters from DAG
    def __init__(**kwargs):
        super(MyCustomSensor).__init__(**kwargs)
        pass
    def poke(self, context): 
        #your logic
        pass

class MyCustomHook(BaseHook):
    
    @apply_defaults # for default parameters from DAG
    def __init__(**kwargs):
        super(MyCustomHook).__init__(**kwargs)
        pass
    def get_connection(self):
        #your logic
        pass

Managing task dependencies

Most of the time in a workflow, to execute a task, we need to check the prerequisites for that task to execute. It is called dependencies. There are different types

Dependencies

1- `Linear dependencies - Each task must be completed before executing the next. For instance,

flowchart LR
A[first_task] --> B[second_task] --> c[third_task] 

Airflow representation

first_task >> second_task >> third_task

first_task must be completed before the second and third tasks. And second_task must be completed before third_task

2- `Fan-In and Fan-Out dependencies - In this dependency, a task waits for multiple tasks to finish. For example,

flowchart LR
A[first_task] --> B[second_task] --> C[third_task] 
D[extra_dependent_task] --> C[third_task]
C[third_task] --> E[end_task]

Airflow representation

first_task >> second_task >>third_task >>end_task

extra_dependent_task >> third_task

Branching

Branching helps to take decisions to decide which task or dag to execute.

1- Branching within the Task

Programmatically handled the task execution

def _decide_task(**context):    
    if context["execution_date"] < ROLLOUT_DATE:
        old_task(**context)
    else
        new_task(**context)
    
...

task_branching = PythonOperator(
    task_id="task_branching",
    python_callable=_decide_task,
)

Cons - Difficult to know by the DAG visualization which path was executed and why

2- Branching within the DAG

Dummy Operator helps to achieve branching within DAGS.

Let's create below DAG

flowchart LR
A[start] --> B1[initialize_system] --> B[run_job] --> C[combine_task] 
B1[initialize_system] --> D1[new_data] --> J[joined_data] 
B1[initialize_system] --> D2[old_data] --> J[joined_data] 
B1[initialize_system] --> R[reference_data] --> J[joined_data] --> C[combine_task]
C[combine_task] --> F[end_task]

So above, we run our job daily and there are two datasets (old and new) that are required to be merged to reference data to supply input to combine_job task.

This way, joined _dataa will start executing as soon as all of its parents have finished executing without any failures, allowing it to continue its execution after the branch

However, to make it a bit better, we can add a dummy task to mark the completion of joining old and new datasets. It's better to safeguard that old and new data are surely available for the next run.

for example,

from airflow.operators.dummy import DummyOperator
 
join_branch = DummyOperator(
    task_id="join_old_and_new_data",
    trigger_rule="none_failed"
)

Out workflow will looklike this

flowchart LR
A[start] --> B1[initialize_system] --> B[run_job] --> C[combine_task] 
B1[initialize_system] --> D1[new_data] --> D3[join_old_and_new_data] 
B1[initialize_system] --> D2[old_data] --> D3[join_old_and_new_data] 
D3[join_old_and_new_data] --> J[joined_data] 
B1[initialize_system] --> R[reference_data] --> J[joined_data] --> C[combine_task]
C[combine_task] --> F[end_task]

Now adding an extra task for the old and new datasets can help us to make it clear visual and also to enable control to make sure that data is available for the next run.

Trigger Rule

All Airflow operators provides trigger_rule argument the defines as trigger this task when all directly upstream tasks have succeeded

  • All existing options are
    • all_success: (default) all parents have succeeded
    • all_failed: all parents are in a failed or upstream_failed state
    • all_done: all parents are done with their execution
    • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
    • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
    • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
    • none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
    • dummy: dependencies are just for show, trigger at will

Cross Communication aka XCOM

XCOM is used to share data between the tasks.

xcom_push - register data in Airflow Metadata db xcom_pull - use registered data from Airflow Metadata db

example

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG('xcom_example', 
          schedule_interval=None,
         default_args=args)

task_1 = BashOperator(task_id="task_1",
                    bash_command='echo "{{ ti.xcom_push(key="xcom-key", value="xcom-data-to-share") }}"',
                    dag=dag)

task_2  = BashOperator(task_id="task_2",
                       bash_command='echo "{{ ti.xcom_pull(key="xcom-key") }}"',
                       dag=dag)

task_1 >> task_2

Limitations

  • Don't use it to share BIG DATA ( Be mindful that Airflow is a orchestrator not computational framework)
  • Any value stored by an XCom needs to support being serialized.

Accessing Metadata Database

Airflow Metadata database can be used for storing DAG configurations, tables, constants and IDs. It uses Key-Value pair to maintain these variables. Stores configuration (variables) can be accessed by Variable.

# Suppose we have kept dag_config in metadata database in this format
# dag_config = {"key1":"value1", "key2":["a", "b", "c"]}
# then below is the way to retrive them in the dag
get_dag_config = Variable.get("dag_config", deserialize_json=True)
config1 = get_dag_config["key1"]
config2 = get_dag_config["key2"]


# If varibale is simply saved in this format key1 = value1, then we use 
get_key1 = Variable.get("key1")

# Similarly we can set the variables in metadata database.
Variable.set("my_key", "my_value")

Accessing via command line

# import variable json file
docker-compose run --rm webserver airflow variables --import /usr/local/airflow/dags/config/dag_config.json

# get value of key1
docker-compose run --rm webserver airflow variables --get key1

Testing

For Airflow, generally below tests can be performed.

  • DAG Validation Test - To test whether DAG is Valid and Acyclic.

  • Unit Test - To test python functions, operators etc.

  • Integration Test - To test whether tasks of the workflow can connect each other.

  • Workflow Test - To test complete pipeline

Check these blogs for in depth knowledge.

Best Practices

  • Write a clean DAG and stick with either of the one ways to create your DAG (with a context manager or without context manager).
  • Stick to a better naming convention while writing your task name. Be explicit and logical.
  • Keep computation code (SQL, script, python code etc.) and DAG definition separate. Every time DAG loads, it recomputes, hence more time it took to load.
  • Don't hardcode either constant value or any sensitive connection information in the code. Manage it in the config file or at the central level in a secure way.
  • Create the tag and use it for a quick look to group the tasks in monitoring.
  • Always search for existing inbuilt airflow operators, hooks or sensors before creating your custom stuff.
  • XCOM is not for heavy data transfer.
  • Data Quality and Testing often get overlooked. So make sure you use a standard for your codebase.
  • Follow load strategies - incremental, scd types in your code to avoid unnecessary data load.
  • If possible, create a framework for DAG generations. A meta wrapper. Check out this repo.
  • Specify configuration details once - The place where SQL templates are is configured as an Airflow Variable and looked up as a global parameter when the DAG is instantiated.
  • Pool your resources: All task instances in the DAG use a pooled connection to the DWH by specifying the pool parameter.
  • Manage login details in one place - Connection settings are maintained in the Admin menu.
  • Know when to start a task. Normal scheduler or Event Trigger?

Where to go from here?

An essential part of learning anything is to create a project. When we build something, we understand the working principle and its terminologies the hard way. We have touched almost everything we need to work on any Airflow project. However, you might need to come back here again to refresh the theory or google some advanced topics once we start working on the project (for example, dynamic dag, task creation β€” dag factory, deployment, monitoring, dag management, data quality checks, unit tests, etc). Do let me know if you want to see more details on those topics.

But for now, we are set to go. So, let's create a project and put our learning into action πŸ’ͺ

Check out this GitHub repo for the project. Airflow - Chapel Hill Survey Data Analysis

Reference

The airflow community is very active, and many contributors across the globe are enriching and creating several packages and utilities to make developers' life easy. It is always good to follow the community and people on Twitter and GitHub to be in touch with new releases. Most of the details in these notes are taken from the below links. Check it out for more information.

About

A quick and handy notes for Airflow concept.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published