In [None]:
#Apache Airflow is an open-source platform to Author, Schedule and Monitor workflows. 
#It was created at Airbnb and currently is a part of Apache Software Foundation. 
#Airflow helps you to create workflows using Python programming language and these workflows can be scheduled 
#and monitored easily with it.

#### DAG - Directed Acyclic Graph

In [None]:
#Define uma sequência de tarefas, com dependências

#### Run a workflow

In [None]:
airflow run <dag_id> <task_id> <start_date>

airflow run example-etl download-file 2020-01-10

#### Define a DAG

In [5]:
from airflow.models import DAG
from datetime import datetime

#Opcionais, mas é sempre bom usar
default_arguments = {
    'owner':'preco',
    'email':'rodrigopreco@gmail.com',
    'start_date':datetime(2021,4,21),
    'retries':2
}

etl_dag = DAG( 'etl_workflow', default_args=default_arguments )

In [None]:
#The airflow command line program contains many subcommands.
airflow -h for descriptions.

#Many are related to DAGs.
airflow list_dags #to show all recognized DAGs.

#### BashOperator

In [6]:
#BashOperator
#Executa um comando bash
#Roda o comando em um diretorio temporario
#Pode especificar variaveis de ambiente
BashOperator(
            task_id='bash_example',
            bash_command='echo "Example!"',
            dag=ml_dag)

BashOperator(task_id='bash_script_example',
             bash_command='runcleanup.sh',
             dag=ml_dag)

In [None]:
from airflow.operators.bash_operator import BashOperator
example_task = BashOperator(task_id='bash_ex',
                            bash_command='echo 1',
                            dag=dag)
bash_task = BashOperator(task_id='clean_addresses',
                            bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',
                            dag=dag)

#### PythonOperator

In [None]:
#Executes a Python function / callable
#Operates similarly to the BashOperator, with more options
#Can pass in arguments to the Python code
from airflow.operators.python_operator import PythonOperator
def printme():
    return print("This goes in the logs!")
python_task = PythonOperator(
                            task_id='simple_print',
                            python_callable=printme,
                            dag=example_dag
                            )

def sleep(length_of_time):
    return time.sleep(length_of_time)
sleep_task = PythonOperator(
                            task_id='sleep',
                            python_callable=sleep,
                            op_kwargs={'length_of_time': 5}
                            dag=example_dag
                            )

#### EmailOperator

In [None]:
#Found in the airflow.operators library
#Sends an email
#Can contain typical components
#HTML content
#Attachments
#Does require the Air.ow system to be con,gured with email server details

from airflow.operators.email_operator import EmailOperator
email_task = EmailOperator(
                            task_id='email_sales_report',
                            to='sales_manager@example.com',
                            subject='Automated Sales Report',
                            html_content='Attached is the latest sales report',
                            files='latest_sales.xlsx',
                            dag=example_dag
                            )

#### Tasks

In [None]:
#Instances of operators
#Usually assigned to a variable in Python
#Referred to by the task_id within the Air.ow tools

example_task = BashOperator(task_id='bash_example',
                            bash_command='echo "Example!"',
                            dag=dag)


In [None]:
#Define a given order of task completion
#Are not required for a given work.ow, but usually present in most
#Are referred to as upstream or downstream tasks
#In Air.ow 1.8 and later, are de,ned using the bitshi/ operators
#>>, or the upstream operator
#<<, or the downstream operator
#Upstream means before
#Downstream means after

# Define the tasks
task1 = BashOperator(task_id='first_task',
                    bash_command='echo 1',
                    dag=example_dag)

task2 = BashOperator(task_id='second_task',
                    bash_command='echo 2',
                    dag=example_dag)

# Set first_task to run before second_task
task1 >> task2 # or task2 << task1


#### Schedule

In [None]:
#start_date - The date / time to initially schedule the DAG run
#end_date - Optional for when to stop running new DAG instances
#max_tries - Optional atribute for how many attempts
#schedule_interval
    #How often to schedule the DAG
    #Between the start_date and end_date
    #Can be defined via cron style syntax or via built-in presets.

from datetime import timedelta

test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')

#### Cron Syntax

In [None]:
#Is pulled from the Unix cron format
#0 12 * * * # Run daily at noon
#* * 25 2 * # Run once per minute on February 25
#0,15,30,45 * * * * # Run every 15 minutes

#Preset:
#@hourly
#@daily
#@weekly
#@monthly
#@yearly
#cron equivalent:
#0 * * * *
#0 0 * * *
#0 0 * * 0
#0 0 1 * *
#0 0 1 1 *

from airflow.models import DAG
from datetime import datetime

#Everyday midnight
report_dag = DAG(
                    dag_id = 'execute_report',
                    schedule_interval = "0 0 * * *",
                    default_args=default_args
                )

#### Sensors

In [None]:
#What is a sensor?
#An operator that waits for a certain condition to be true
#Creation of a file
#Upload of a database record
#Certain response from a web request
#Can define how often to check for the condition to be true
#Are assigned to tasks

from airflow.contrib.sensors.file_sensor import FileSensor
file_sensor_task = FileSensor(task_id='file_sense',
                                filepath='salesdata.csv',
                                poke_interval=300,
                                dag=sales_report_dag)

init_sales_cleanup >> file_sensor_task >> generate_report

#Other Sensors
#ExternalTaskSensor - wait for a task in another DAG to complete
#HttpSensor - Request a web URL and check for content
#SqlSensor - Runs a SQL query to check for content
#Many others in airflow.sensors and airflow.contrib.sensors



#### SLA

In [None]:
#An SLA stands for Service Level Agreement
#Within AirFLow, the amount of time a task or a DAG should require to run
#An SLA Miss is any time the task / DAG does not meet the expected timing
#If an SLA is missed, an email is sent out and a log is stored.
#You can view SLA misses in the web UI.

#Using the 'sla' argument on the task
task1 = BashOperator(task_id='sla_task',
                    bash_command='runcode.sh',
                    sla=timedelta(seconds=30),
                    dag=dag)

#On the default_args dictionary
default_args={
                'sla': timedelta(minutes=20)
                'start_date': datetime(2020,2,20)
                }

dag = DAG('sla_dag', default_args=default_args)

#### Timedelta object

In [None]:
#In the datetime library
#Accessed via from datetime import timedelta
#Takes arguments of days, seconds, minutes, hours, and weeks
#timedelta(seconds=30)
#timedelta(weeks=2)
#timedelta(days=4, hours=10, minutes=20, seconds=30)

#### Templates

In [None]:
#Allow substituting information during a DAG run
#Provide added .exibility when de,ning tasks
#Are created using the Jinja templating language

templated_command="""
echo "Reading {{ params.filename }}"
"""
t1 = BashOperator(task_id='template_task',
                    bash_command=templated_command,
                    params={'filename': 'file1.txt'}
                    dag=example_dag)


#### Variables

In [None]:
#Airflow built-in runtime variables
#Provides assorted information about DAG runs, tasks, and even the system con,guration.
#Examples include:
#Execution Date: {{ ds }} # YYYY-MM-DD
#Execution Date, no dashes: {{ ds_nodash }} # YYYYMMDD
#Previous Execution date: {{ prev_ds }} # YYYY-MM-DD
#Prev Execution date, no dashes: {{ prev_ds_nodash }} # YYYYMMDD
#DAG object: {{ dag }}
#Airflow config object: {{ conf }}

#### Macros

In [None]:
#In addition to others, there is also a {{ macros }} variable.
#This is a reference to the Air.ow macros package which provides various useful objects /
#methods for Air.ow templates.
#{{ macros.datetime }} : The datetime.datetime object
#{{ macros.timedelta }} : The timedelta object
#{{ macros.uuid }} : Python's uuid object
#{{ macros.ds_add('2020-04-15', 5) }} : Modify days from a date, this example returns
#2020-04-20

#### Branching

In [None]:
#Branching in Air.ow:
#Provides conditional logic
#Using BranchPythonOperator
#from airflow.operators.python_operator import BranchPythonOperator
#Takes a python_callable to return the next task id (or list of ids) to follow

def branch_test(**kwargs):
    if int(kwargs['ds_nodash']) % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'
branch_task = BranchPythonOperator(task_id='branch_task',
                                    dag=dag,
                                   provide_context=True,
                                   python_callable=branch_test)

start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2

#### Pipeline Completo - Example 1

In [None]:
#Production pipeline
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from dags.process import process_data
from datetime import timedelta, datetime

# Update the default arguments and apply them to the DAG
default_args = {
  'start_date': datetime(2019,1,1),
  'sla':timedelta(minutes=90)
}

dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

sensor >> bash_task >> python_task

#### Pipeline Completo - Example 2

In [None]:
#Enviando emails condicionais
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

# Update the default arguments and apply them to the DAG.

default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)


email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""


email_report_task = EmailOperator(task_id='email_report_task',
                                  to='sales@mycompany.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)


no_email_task = DummyOperator(task_id='no_email_task', dag=dag)


def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
    
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   python_callable=check_weekend,
                                   provide_context=True,
                                   dag=dag)

    
sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]

In [None]:
#saiba mais em : https://airflow.apache.org/docs