Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
134 lines (100 sloc) 3.84 KB
title sidebar redirect_to
Simple Sample DAG
platform_sidebar

This tutorial walks through some of the core Apache Airflow concepts and how they fit into a simple, sample Airflow DAG.

Sample Pipeline Definition

Here is an example of a basic pipeline definition. Below it, you'll find a line by line explanation of the various pipeline components.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 9, 10, 0, 0),
    'random_logic': False
}

dag = DAG(
    'sample_dag',
    schedule_interval="@once",
    default_args=args
)

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = DummyOperator(
    task_id='load_data',
    dag=dag
)

t3 = DummyOperator(
    task_id='random_task',
    dag=dag
)

t1.set_downstream(t2)
t2.set_downstream(t3)

Importing Modules

First things first, we need to import all of the necessary modules.

from datetime import datetime

# DAG Object - we need this to instantiate the DAG
from airflow import DAG

# Operators - these are needed for the DAG to actually operate. We are just using Dummy operators in this example
from airflow.operators.dummy_operator import DummyOperator

Default Arguments

Here, we can define a set of arguments that are then explicitly passed to each subsequent task. Setting default arguments is not mandatory but doing so can reduce redundancy later in the pipeline.

In this example, we use the default args to define the common owner, start_date, and a line of random_logic for all of the tasks.

args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 9, 10, 0, 0),
    'random_logic': True
}

Instantiate the DAG

Before we outline the tasks we want to run, we need a DAG to nest them into. First, we pass a string in for dag_id to serve as a unique identifier for the DAG. In this case, we kept it simple and used sample_dag.

Next, we define a schedule_interval that dictates how often the DAG will run (just @once here).

Finally, we passed in the default argument dictionary we defined earlier for our default_args.

dag = DAG(
    'sample_dag',
    schedule_interval="@once",
    default_args=args
}

Creating Tasks

Great! Now that we have a DAG, we can start defining tasks that we want to run. As mentioned in the Core Airflow Concepts doc, tasks are generated by instantiating operator objects.

For this sample, we just used dummy operators to create 3 generic tasks - extract_data(t1), load_data(t2), and random_task(t3) - but you can get a lot more creative with your tasks with the wide swath of operators that are available.

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = DummyOperator(
    task_id='load_data',
    dag=dag
)

t3 = DummyOperator(
    task_id='random_task',
    dag=dag
)

Setting up Dependencies

Since the tasks we created aren't inherently dependent on each other, we can explicitly define the dependencies between them. The way we have defined dependencies in this example pipeline results in the following:

t3 runs first

t2 runs after t3 runs successfully

t1 runs after t2 runs successfully

t1.set_downstream(t2)
t2.set_downstream(t3)

We can also use bitshift operators to set dependencies.

t1 >> t2
t2 >> t3

With that, we have successfully written an Airflow DAG! Head here to see how to deploy this DAG. Or, check out our docs on example DAGS and DAG Writing Best Practices.