In [None]:
# python bash
# base very simple code

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

# 
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

# list of arguments
default_dag_args = {
    'start_date': yesterday
}

# init of DAG object, daily run
with models.DAG(
        'running_python_and_bash_operator',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def hello_world():
        print('Hello World!')
        return 1

    def greeting():
        print('Greetings from SpikeySales! Happy shopping.')
        return 'Greeting successfully printed.'
    
    # these are the 3 operators
    # PythonOperator can call (UDF) functions
    hello_world_greeting = python_operator.PythonOperator(
        task_id='python_1',
        python_callable=hello_world)
    
    spikeysales_greeting = python_operator.PythonOperator(
        task_id='python_2',
        python_callable=greeting)

    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.')
    
    #call of the operators
    hello_world_greeting >> spikeysales_greeting >> bash_greeting

In [None]:
# all_success_python_bash.py
# using trigger rule

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.utils import trigger_rule

# start day is yesterday
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

# re-try once after 2 minutes
default_dag_args = {
    'start_date': yesterday,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2)
}


with models.DAG(
        'python_and_bash_with_all_success_trigger',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def hello_world():
        raise ValueError('Oops! something went wrong.')
        print('Hello World!')
        return 1

    def greeting():
        print('Greetings from SpikeySales! Happy shopping.')
        return 'Greeting successfully printed.'

    hello_world_greeting = python_operator.PythonOperator(
        task_id='python_1',
        python_callable=hello_world)

    spikeysales_greeting = python_operator.PythonOperator(
        task_id='python_2',
        python_callable=greeting)

    # here you can find the trigger rule
    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.',
        trigger_rule=trigger_rule.TriggerRule.ALL_SUCCESS)

    hello_world_greeting >> spikeysales_greeting >> bash_greeting

In [None]:
# one_failed_python_bash.py

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.utils import trigger_rule

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2)
}


with models.DAG(
        'python_and_bash_with_one_failed_trigger',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def hello_world():
        raise ValueError('Oops! something went wrong.')
        print('Hello World!')
        return 1

    def greeting():
        print('Greetings from SpikeySales! Happy shopping.')
        return 'Greeting successfully printed.'

    hello_world_greeting = python_operator.PythonOperator(
        task_id='python_1',
        python_callable=hello_world)
    
    spikeysales_greeting = python_operator.PythonOperator(
        task_id='python_2',
        python_callable=greeting)

    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.',
        trigger_rule=trigger_rule.TriggerRule.ONE_FAILED)

    hello_world_greeting >> spikeysales_greeting >> bash_greeting

In [None]:
# all done trigger type
# raise error in every step

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.utils import trigger_rule

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2)
}


with models.DAG(
        'python_and_bash_with_all_done_trigger',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def hello_world():
        raise ValueError('Oops! something went wrong.')
        print('Hello World!')
        return 1

    def greeting():
        raise TypeError('Incorrect type.')
        print('Greetings from SpikeySales! Happy shopping.')
        return 'Greeting successfully printed.'

    hello_world_greeting = python_operator.PythonOperator(
        task_id='python_1',
        python_callable=hello_world)
    

    spikeysales_greeting = python_operator.PythonOperator(
        task_id='python_2',
        python_callable=greeting)

    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    hello_world_greeting >> spikeysales_greeting >> bash_greeting

In [None]:
# working with stats
# if you just simple upload and try to run the DAG it will fail because
# the airflow doesn't know the scipy, we have to set in the composer / PYPI
# packages and edit and write "scipy>=1.1.0"

import datetime
from scipy import stats

from airflow import models
from airflow.operators import python_operator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2)
}

with models.DAG(
        'finding_the_most_common_element',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def print_most_common_number():
        num = stats.mode(["9","5","2","5","1","6"])
        print(num)
        return('Successfully printed most common element!')


    printing_most_common_element = python_operator.PythonOperator(
        task_id='most_common_number',
        python_callable=print_most_common_number)

    printing_most_common_element

In [None]:
# useing dummy

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.operators import dummy_operator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday
}


with models.DAG(
        'running_python_bash_and_dummy_operator',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def hello_world():
        print('Hello World!')
        return 1

    def greeting():
        print('Greetings from SpikeySales! Happy shopping.')
        return 'Greeting successfully printed.'

    hello_world_greeting = python_operator.PythonOperator(
        task_id='python_1',
        python_callable=hello_world)
    
    spikeysales_greeting = python_operator.PythonOperator(
        task_id='python_2',
        python_callable=greeting)

    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.')

    end = dummy_operator.DummyOperator(
        task_id='dummy')

    hello_world_greeting >> spikeysales_greeting >> bash_greeting >> end

In [None]:
# make choice if the random number is small than 
# hello else dummy variable

import random
import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from airflow.operators import dummy_operator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday
}

with models.DAG(
        'branching_python_operator',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    def greeting():
        print('Greetings from SpikeySales! Happy shopping.')
        return 'Greeting successfully printed.' 

    def makeBranchChoice():
        x = random.randint(1, 5)

        if(x <= 2):
            return 'hello_spikey'

        else:
            return 'dummy'  

    run_this_first = dummy_operator.DummyOperator(
        task_id='run_this_first'
    )

    branching = python_operator.BranchPythonOperator(
        task_id='branching',
        python_callable=makeBranchChoice
    )

    run_this_first >> branching
          
    spikeysales_greeting = python_operator.PythonOperator(
        task_id='hello_spikey',
        python_callable=greeting)

    dummy_followed_python = dummy_operator.DummyOperator(
        task_id='follow_python')

    dummy = dummy_operator.DummyOperator(
        task_id='dummy')

    bash_greeting = bash_operator.BashOperator(
        task_id='bye_bash',
        bash_command='echo Goodbye! Hope to see you soon.',
        trigger_rule='one_success'
    )

    branching >> spikeysales_greeting >> dummy_followed_python >> bash_greeting
    branching >> dummy >> bash_greeting


In [None]:
# run query via big query
# send email notification via send grid
# before you can use Send Grid you have to set up the
# send grid API!


import datetime
import time

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_to_gcs
from airflow.operators import email_operator
from airflow.contrib.operators import bigquery_table_delete_operator
from airflow.utils import trigger_rule

updated_time = time.strftime('%d-%m-%Y %H:%M:%S')

bq_dataset_name = models.Variable.get('bq_dataset_name')
bq_products_on_sale_table_id = bq_dataset_name + '.temp_sale_table'
output_file = '{gcs_bucket}/products_on_sale {current_time}.csv'.format(
    gcs_bucket=models.Variable.get('gcs_bucket'), current_time=updated_time)

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

email_id = models.Variable.get('email')

default_dag_args = {
    'start_date': yesterday,
    'email': email_id,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'current_products_on_sale_notification',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    query_current_sales_products = bigquery_operator.BigQueryOperator(
        task_id='query_products_on_sale',
        bql="""
        SELECT product_id, product_name
        FROM `{bq_table_id}` 
        WHERE sale = True
        """.format(bq_table_id=models.Variable.get('bq_table_id')),
        use_legacy_sql=False,
        destination_dataset_table=bq_products_on_sale_table_id,
        write_disposition='WRITE_TRUNCATE')

    export_data_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
        task_id='export_sale_data_to_gcs',
        source_project_dataset_table=bq_products_on_sale_table_id,
        destination_cloud_storage_uris=[output_file],
        export_format='CSV')

    email_updation_notification = email_operator.EmailOperator(
        task_id='email_notification',
        to=email_id,
        subject='Sale product data updated',
        html_content="""
        Updated sale products for {current_time}.
        """.format(current_time=updated_time),
        trigger_rule=trigger_rule.TriggerRule.ALL_SUCCESS)

    delete_bq_table = bigquery_table_delete_operator.BigQueryTableDeleteOperator(
        task_id='delete_bigquery_table',
        deletion_dataset_table=bq_products_on_sale_table_id)
    ( 
    query_current_sales_products 
    >> export_data_to_gcs 
    >> email_updation_notification 
    >> delete_bq_table
    )

In [None]:
# working with hadoop based dataproc

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

wordcount_args = ['wordcount', 'gs://us-central1-spikey-composer-486ba46f-bucket/dags/doc.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {

    'start_date': yesterday,

    'email_on_failure': False,
    'email_on_retry': False,

    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}


with models.DAG(
        'composer_hadoop_wordcount',

        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        
        cluster_name='spikey-wordcount-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')


    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='spikey-wordcount-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)


    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='spikey-wordcount-cluster-{{ ds_nodash }}',

        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster