Creating DAG

In order to create a DAG, you must define a DAG file that contains all the details pertaining to DAG tasks, and dependencies must be defined in a file (Python script). This is a configuration file specifying the DAG’s structure as code.

The five steps that must to be taken to run a DAG

Step 1: Importing the Required Libraries

In [3]:
from datetime import datetime, timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

Step 2: Defining the Default Arguments

The next step is to define some important parameters, to ensure that Airflow executes the DAGs at designated time intervals and an appropriate number of times.

In [4]:
args = {
 'owner': 'Admin',
 'start_date': datetime.now() - timedelta(days=3),
 # 'end_date': datetime(2018, 12, 30),
 'depends_on_past': False,
 'email': ['airflow@example.com'],
 'email_on_failure': False,
 'email_on_retry': False,
 'retries': 1,
 'retry_delay': timedelta(minutes=5),
}

Step 3: Creating a DAG

The third step is to create the DAG itself, which consists of the DAG’s name and schedule interval, as shown following.

In [12]:
dag = DAG(
    'pramod_airflow_dag',
    default_args=args,
    description='A simple DAG',
    # Continue to run DAG once per day
     schedule=timedelta(days=1)
)

Step 4: Declaring Tasks

The next step is to declare the tasks (actual jobs) to be executed. All the tasks can be declared and made part of the same DAG created in the preceding step.

In [13]:
dag = DAG(
    'pramod_airflow_dag',
    default_args=args,
    description='A simple DAG',
    # Continue to run DAG once per day
     schedule=timedelta(days=1)
)

In [14]:
task_1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

In [15]:
task_2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag,
)

Step 5: Mentioning Dependencies

The final step is to set the order of task execution. They can be either parallel or sequential tasks. There are multiple ways in which the tasks can be defined.

In [16]:
task_1 >> task_2

<Task(BashOperator): sleep>