In [None]:
# default_exp airflow

# airflow

> API details.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

In [None]:
#export
default_args = {
    'owner': 'insai',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['mark@insai.tech'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

In [None]:
#export
dag = DAG('insai-test', default_args=default_args, description='Simple test DAG',schedule_interval=timedelta(days=1))

## Tasks

In [None]:
#export
t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)

In [None]:
#export
t2 = BashOperator(task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, dag=dag)

Task documentation

In [None]:
#export
t1.doc_md = """\
#### Task Documentation
"""

templating with `jinja`

In [None]:
#export
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

In [None]:
#export
t3 = BashOperator(task_id='templated', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag)

Defining dependencies

`t1.set_downstream(t2)`

This means that t2 will depend on t1 running successfully to run. It is equivalent to:

`t2.set_upstream(t1)`

The bit shift operator can also be used to chain operations:

`t1 >> t2`

And the upstream dependency with the bit shift operator:

`t2 << t1`

Chaining multiple dependencies becomes concise with the bit shift operator:

`t1 >> t2 >> t3`

A list of tasks can also be set as dependencies. These operations all have the same effect:

`t1.set_downstream([t2, t3])`

`t1 >> [t2, t3]`

`[t2, t3] << t`

In [None]:
#export
t1 >> [t2, t3]

[<Task(BashOperator): sleep>, <Task(BashOperator): templated>]