<div style="line-height:1.2;">

<h1 style="color:darkturquoise; margin-bottom: 0.3em;">Airflow tutorial 2 </h1>

<p style="margin-top: 0.5em; margin-bottom: 1.5em;"><strong> Data pipelines (work in progress..) </strong></p>

<div style="line-height:1.4; margin-bottom: 1em;">
    <h3 style="color: lightblue; display: inline; margin-right: 0.5em;">Keywords:</h3>
    <span style="display: inline;">DAG creation + @dag and @task decorators + DagBag + annotations + LoggingMixin + Executor + smtplib </span>
</div>

<div style="line-height:1.4; margin-top: 1em;">
    <h3 style="color: red; display: inline; margin-right: 0.5em;">Notes:</h3>
    <span style="display: inline;">
    Jupyter Notebooks must be converted to Python Script to work, since not designed for task scheduling and workflow management. <br>
    All the dags ".py" files need to be placed and place it in the Airflow's dags folder. => ~/airflow/dags (or the dir specified in the "airflow.cfg" config file) <br>
    In Apache Airflow, it is possible to define multiple DAGs into the same Python file. It is not mandatory having separate file for each DAG. <br>
    </span>
</div>

</div>

In [40]:
from __future__ import annotations

import json
import random
import pendulum
from datetime import datetime, timedelta
from airflow.notifications.basenotifier import BaseNotifier

from airflow import DAG
#OR 
#from airflow.models.dag import DAG
# both import statements are equivalent and will have the same effect in your Airflow scripts.

from airflow.models import DagBag
from airflow.decorators import dag, task
from airflow.utils.context import Context
from airflow.utils.email import send_email
from airflow.operators.bash import BashOperator
from airflow.utils.log.logging_mixin import LoggingMixin

<h3 style="color: darkturquoise;"> Recap: Python annotations </h3>
<div style="margin-top: -8px;">
=> Basically, it is giving type to methods' parameters. <br>
Annotations are used to enable postponed evaluation of type annotations, to allow the specification of the expected types of input and output values for the tasks. <br>
In this is way it is possible to ensure that the type annotations specified in the function declarations will be preserved at runtime. 
<div>

<h3 style="color: darkturquoise;"> Recap: LoggingMixin </h3>
<div style="margin-top: -8px;">
The LoggingMixin can be used to have have a logger configured with the class name. <br>
It is often used to simplify logging in custom operators, sensors, hooks, or other Airflow components, to ensure that your custom Airflow components log information in a way that aligns with the Airflow framework's logging conventions.
<div>

<h2 style="color: darkturquoise;"> Example #5 </h2>
Simple data pipeline to perform Extract, Transform, and Load procedures in 3 tasks.

In [16]:
""" 
N.B.1
The '@dag' and '@task' decorators part of the Airflow TaskFlow API and are specific to Apache Airflow.
They provide a Pythonic and programmatic way to create and manage workflows in Airflow.
They can be used to define and configure Directed Acyclic Graphs and individual tasks within them. 
These decorators are used to convert a regular Python function into a graph and a task, by adding metadata and configuration settings.
"""
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example_5"],
)
def function_5_create_data_pipeline_for_ETL():
    @task()
    def extract():
        """ Simulate the extraction of data from a formatted a JSON object. """
        ##### Data strings with various types of content
        data_string1 = json.dumps({f"key_{i}": f"value_{i}" for i in range(1, 11)})
        data_ints2 = json.dumps({f"int_key_{i}": i * 100 for i in range(1, 11)})
        data_values3 = json.dumps({f"list_key_{i}": [i, i * 2, i * 3] for i in range(1, 11)})
        data_bools4 = json.dumps({f"bool_key_{i}": i % 2 == 0 for i in range(1, 11)})
        data_nest5 = json.dumps({f"nested_key_{i}": {"subkey": i, "subvalue": f"sub_{i}"} for i in range(1, 11)})
        ##### Parse the JSON formatted strings and convert them into Python dictionaries
        ddict1 = json.loads(data_string1)
        ddict2 = json.loads(data_ints2)
        ddict3 = json.loads(data_values3)
        ddict4 = json.loads(data_bools4)
        ddict5 = json.loads(data_nest5)
        # Combine all dictionaries into one
        combined_data = {**ddict1, **ddict2, **ddict3, **ddict4, **ddict5}

        return combined_data
    ####################################################################################
    @task(multiple_outputs=True)
    def transform(combined_data_dict: dict):
        """ Transform a collection of diverse data types by computing the total value of integer entries. """
        total_order_value = 0
        ### Loop through the values and sum up only the integer values
        for value in combined_data_dict.values():
            if isinstance(value, int):
                total_order_value += value

        return {"total_order_value": total_order_value}
    ####################################################################################
    @task()
    def load(total_order_value: float):
        """ Simulate the load by printing the result of the Transform task, instead of saving it. """
        print(f"Total order value is: {total_order_value:.2f}")
    
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

<h3 style="color: darkturquoise;"> Recap: Common parameters for the @dag decorator</h3>
<div style="margin-top: -8px;">
    
1. schedule (optional): Defines how often the DAG should run. This could be a cron expression, a datetime.timedelta object, or one of Airflows preset intervals like @daily.

2. start_date (required): The start date for the DAG. This is a datetime.datetime object that marks the earliest date at which the DAG can start running.

3. end_date (optional): The end date for the DAG. This is a datetime.datetime object that, if specified, prevents the DAG from executing after this date.

4. tags (optional): A list of tags that can be used for filtering and organizing DAGs within the Airflow UI.

5. catchup (optional, default True): If set to False, Airflow will only run the latest instance of the DAG and skip any missed intervals since the last run.

6. max_active_runs (optional): This parameter limits the number of DAG runs that can be running concurrently.

7. default_args (optional): A dictionary of default parameters to be applied to all operators in the DAG.

8. description (optional): A string describing the DAG, which is visible in the Airflow UI.

9. on_failure_callback (optional): A function that is called when a DAG fails. This can be used for custom error handling.

10. on_success_callback (optional): Similar to on_failure_callback, but this function is called when the DAG succeeds.

11. on_retry_callback (optional): A function that is called before retrying a failed DAG.

12. max_active_tasks (optional): Sets the number of tasks that can run simultaneously for this DAG.

13. dagrun_timeout (optional): Specifies the maximum runtime for a DAG run beyond which it will be marked as failed.

14. doc_md (optional): Markdown text that will be displayed in the Airflow web UI about this DAG.

15. orientation (optional): Controls the orientation of the DAG visualization in the Airflow UI. It can be either LR (left-to-right) or TB (top-to-bottom).

16. render_template_as_native_obj (optional): When set to True, template fields will be rendered to their native Python types rather than strings.

17. template_searchpath (optional): A list of folders (or paths) that Airflow should look in for loading templates.

18. sla_miss_callback (optional): A function to call when an SLA (Service Level Agreement) miss occurs.

19. params (optional): A dictionary of DAG-level parameters that are accessible from operators.

20. access_control (optional): A dictionary defining the access control for the DAG, useful in multi-tenant environments.

21. is_paused_upon_creation (optional): If set to True, the DAG will be in a paused state when created for the first time.

These parameters provide extensive customization and control over the behavior and execution of DAGs in Airflow, making the @dag decorator a fundamental tool for defining workflows.
<div>

<h3 style="color: darkturquoise;"> Recap: Common parameters for the @task decorator </h3>
<div style="margin-top: -8px;">

1. python_callable (optional): This is the Python function you are decorating. Generally, you dont need to explicitly specify this, as the decorator is applied directly to the function.

2. multiple_outputs (optional, default False): If set to True, this allows the task to return a dictionary where each key will be used as an output XCom key. This is useful when your task returns multiple values that you want to pass to downstream tasks.

3. task_id (optional): This is the identifier for the task within the DAG. If not specified, Airflow will automatically generate an ID based on the function name.

4. executor_config (optional): This parameter can be used to provide executor-specific configuration settings, particularly useful in different execution environments like KubernetesExecutor.

5. retry_delay (optional): Sets the delay between retry attempts. It accepts a datetime.timedelta value.

6. retries (optional): Determines the number of retries that should be attempted in case of failure.

7. trigger_rule (optional): Defines the rule by which the task gets triggered. The default value is "all_success", meaning the task will run when all its upstream tasks have succeeded.

8. depends_on_past (optional, default False): If set to True, the task instance depends on the success of the task instance for the previous schedule period.

9. provide_context (optional, default False): If set to True, Airflow will pass a set of keyword arguments that can be used in your function. This is mostly used in Airflow 1.x and is less relevant in Airflow 2.0 due to the introduction of XComArgs.

10. params (optional): Allows passing a dictionary of parameters that can be accessed by the task.

11. dag (optional): Specifies the DAG that the task belongs to. Typically, you dont need to set this if your task is defined within the context of a DAG.

12. op_args and op_kwargs (optional): These are lists and dictionaries of positional and keyword arguments that will get unpacked and passed into your Python callable.

13. on_failure_callback, on_success_callback, and on_retry_callback (optional): These parameters allow you to specify functions that should be called on task failure, success, or retry.

14. start_date (optional): The start date of the task.

15. end_date (optional): The end date of the task.

16. doc_md (optional): Markdown text that will be displayed in the Airflow web UI about this task.

17. timeout (optional): The maximum runtime of the task before it gets terminated.

18. pool (optional): The pool parameter allows assigning a task to a specific pool of resources. Pools are a way to limit the execution parallelism on arbitrary sets of tasks. <br> 
They are used to constrain the number of running instances of a task across the DAGs. <br>
Each pool can have a predefined number of slots (parallelism capacity). When a task is assigned to a pool, it will use one of these slots when running. <br>
If no slots are available, the task will wait in a queued state until a slot is free.

19. queue: The queue parameter is used when working with the CeleryExecutor in Airflow. <br> 
It allows specifying which queue (in the Celery task queue sense) the task should be sent to. <br> 
Useful for directing certain tasks to specific workers that might have different capabilities or resources. <br>
Each queue can be consumed by a specific set of workers. <br>
By assigning tasks to different queues, it is possible to control which worker processes which task. 




<div>

In [18]:
""" no need to create the dag variable as in previous examples. """
function_5_create_data_pipeline_for_ETL()

<DAG: function_5_create_data_pipeline_for_ETL>

<h2 style="color: darkturquoise;"> Example #6 </h2>
Data pipeline with more complex procedures for extraction, transforming, and loading.
Add parameters to the decorators.

In [24]:
default_args = {
    "owner": "Jack4", 
    "depends_on_past": True,                                # Wait for the success of the previous run of the same task
    "email_on_failure": False,                              # Send an email if a task fails
    "email_on_retry": False,                                # Send an email on task retry
    "email": ["example_email@example.com"],                 # Where send notifications
    "retries": 1,                                           # Retry the specified number of times, if a task fails
    "retry_delay": timedelta(minutes=5),                    # Delay between retries
    "catchup": False,                                       # If False, run the latest instance when it falls behind schedule
    "start_date": pendulum.datetime(2021, 1, 1, tz="UTC"),  # Start date of workflow
}

<h3 style="color: darkturquoise;"> Recap: LoggingMixin </h3>
<div style="margin-top: -8px;">
The LoggingMixin can be used to have have a logger configured with the class name. <br>
It is often used to simplify logging in custom operators, sensors, hooks, or other Airflow components, <br> 
to ensure that everything is logged in a way that aligns with the Airflow framework's logging conventions.
<div>

In [31]:
def callback_for_failure(context):
    """ Log errors in case of failures during task execution, given the context of the task. """
    task_instance = context['task_instance']
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    execution_date = context['execution_date']
    exception = context['exception']
    
    ### Log info about failures
    logger = LoggingMixin().log
    logger.error(f"Failure detected in task '{task_id}' of DAG '{dag_id}' on {execution_date}")
    logger.error(f"Exception: {exception}")
    ### Send an email notification
    subject = f"Airflow Task Failure: {dag_id}.{task_id}"
    body = f"Task '{task_id}' in DAG '{dag_id}' failed on {execution_date}\nException: {exception}"
    send_email(to="your_email@example.com", subject=subject, html_content=body)

def callback_for_success(context: Context):
    task_instance = context['task_instance']
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    execution_date = context['execution_date']
    print(f"Task {task_id} in DAG {dag_id} succeeded on {execution_date}")

def callback_for_retry(context: Context):
    task_instance = context['task_instance']
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    execution_date = context['execution_date']
    print(f"Task {task_id} in DAG {dag_id} will retry on {execution_date}")


In [33]:
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    end_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example_5"],
    default_args=default_args,
    description="Example DAG number 6",
    max_active_runs=3,
    dagrun_timeout=timedelta(hours=2),
    doc_md="""Here is the DAG documentation of our example DAG in Markdown format.""",
    max_active_tasks=10,
    on_failure_callback=None,
    on_success_callback=None,
    sla_miss_callback=None,
    params={"example_param": "param_value"},
    access_control={"role1": {"can_read", "can_edit"}},
    is_paused_upon_creation=False,
    orientation="LR"
)
def function_6_create_another_pipeline_for_ETL():
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        timeout=300,
        on_failure_callback=callback_for_failure,
        on_success_callback=callback_for_success,
        on_retry_callback=callback_for_retry,
        # Specify the executor_config parameter with Kubernetes-specific clearly does not make sense and will have no effect outside of a Kubernetes container.
        #executor_config={"KubernetesExecutor": {"image": "~/my-customimage:latest"}}, 
        pool="default_pool",
        queue="default_queue",
        trigger_rule="all_success"
    )
    def extract():
        """ Simulate data fetching from an external source """
        data_string = json.dumps({
            "1001": random.uniform(200, 400), 
            "1002": random.uniform(400, 600), 
            "1003": random.uniform(500, 700)
        })
        order_data_dict = json.loads(data_string)
        filtered_data = {k: v for k, v in order_data_dict.items() if v > 300}
        return filtered_data    
    
    @task(
        multiple_outputs=True,
        retries=2,
        retry_delay=timedelta(minutes=10),
        timeout=600,
        on_failure_callback=callback_for_failure,
        on_success_callback=callback_for_success,
        on_retry_callback=callback_for_retry,
        pool="high_priority_pool",
        queue="high_priority_queue",
        trigger_rule="one_success"
    )
    def transform(my_data_extracted: dict):
        total_order_value = 0
        order_count = len(my_data_extracted)

        max_order_value = -float('inf')
        min_order_value = float('inf')

        for value in my_data_extracted.values():
            total_order_value += value
            max_order_value = max(max_order_value, value)
            min_order_value = min(min_order_value, value)

        if order_count == 0:
            average_order_value = 0
            max_order_value = 0
            min_order_value = 0
        else:
            average_order_value = total_order_value / order_count

        ## Handle errors in case of unexpected data formats
        if total_order_value < 0:
            raise ValueError("Total order value cannot be negative")
        # Return a dictionary
        return {
            "total_order_value": total_order_value,
            "average_order_value": average_order_value,
            "max_order_value": max_order_value,
            "min_order_value": min_order_value
        }
    @task(
        retries=1,
        retry_delay=timedelta(minutes=15),
        timeout=120,
        email_on_failure=True,
        email=["example_mail@emailprovider.com"],
        pool="special_pool_6",
        queue="special_queue_6",
        trigger_rule="all_failed",
        weight_rule="downstream",
        end_date=datetime(2024, 1, 1)
    )
    def load(final_dict: dict):
        """ Simulate the loading of transformed data to a target destination, by printing the result. """
        print("Loading data...")
        for key, value in final_dict.items():
            print(f"Key: {key}, Value: {value}")

        print("Data loaded successfully.")
        return "Load completed"


task_6_etl = function_6_create_another_pipeline_for_ETL() 
task_6_etl

<DAG: function_6_create_another_pipeline_for_ETL>

<h2 style="color: darkturquoise;"> Example #7 </h2>
Retrieve info about a DAG with DagBag

<h3 style="color: darkturquoise;"> Recap: Kubernetes </h3>
<div style="margin-top: -8px;">

Dagbag does not clearly requires the Kubernetes Executor in Apache Airflow. <br>
However it is requested in this case since it include imports or logic that require Kubernetes-related libraries. <br>
=> Kubernetes Executor is an executor for running Airflow tasks in Kubernetes.  <br>
Kubernetes is an open-source platform designed to automate managing containers => deploying, scaling, and operating application. <br>
<div>

In [21]:
def print_dag_summary(dag_id):
    """ Access and return some relevant properties of the given DAG.\\
    The info are taken from Airflow not from the code above, therefore the DAG need to be created in a python file in the airflow/dags dir.\\
    To create a dag it is necessary to indicate its owner in the 'default_args' dict that need to be added as a parameter of the @dag decorator.
    """
    dag_bag = DagBag()
    dag = dag_bag.get_dag(dag_id)

    if dag is None:
        print(f"No DAG found with id: {dag_id}")
        return

    print(f"DAG ID: {dag.dag_id}")
    print(f"Description: {dag.description}")
    print(f"Schedule Interval: {dag.schedule_interval}")
    print(f"Start Date: {dag.default_args.get('start_date', 'Not specified')}")
    print(f"End Date: {dag.default_args.get('end_date', 'Not specified')}")
    print(f"Number of Tasks: {len(dag.tasks)}")
    print(f"Tags: {dag.tags}")
    print(f"Last Run: {dag.get_last_dagrun()}")

""" When the DAG is created with the decorator without defining a specific dag_id in the variable definition, 
it is necessary the name of the dag method to identify the DAG. 
"""
print_dag_summary('function_5_create_data_pipeline_for_ETL')

[[34m2024-01-18T12:49:39.134+0100[0m] {[34mdagbag.py:[0m538} INFO[0m - Filling up the DagBag from /home/notto4/airflow/dags[0m


DAG ID: function_5_create_data_pipeline_for_ETL
Description: None
Schedule Interval: None
Start Date: Not specified
End Date: Not specified
Number of Tasks: 3
Tags: ['example_5']
Last Run: None


<h3 style="color: darkturquoise;"> Notes: Executors </h3>
<div style="margin-top: -8px;">
If no executor is defined as parameter, Airflow chose the SequentialExecutor as default, for creating simple, single-process workflows. <br>
The SequentialExecutor runs one task at a time in a single process. It does not run tasks in parallel. <br>
When dealing with more complex scenarios, typically configure Airflow to use an executors are typically configured to support parallelism.  <br>
<div>


**SequentialExecutor**

The **LocalExecutor** can be instead used to enable parallelism in a simple manner. <br>
Write the following lines into the 'airflow.cfg' file, then restart Airflow Services

[core] <br>
executor = LocalExecutor <br>
[core] <br>
sql_alchemy_conn = mysql+pymysql://username:password@localhost/airflow <br>

<h2 style="color: darkturquoise;"> Example #8 </h2>
Create a notifier

Edit "airflow.cfg" to include your SMTP settings: <br>

[email] <br>
email_backend = airflow.utils.email.send_email_smtp <br>

[smtp]  <br>
smtp_host = smtp.example.com <br>
smtp_starttls = True <br>
smtp_ssl = False <br>
smtp_user = email_tmp_ok@guerillmail.info <br>
smtp_password = your_password <br>
smtp_port = 587 <br>
smtp_mail_from = email_tmp_ok@guerillmail.info <br>

In [41]:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

def send_my_message(subject, message, to_email):
    #### Create the SMTP server configuration
    smtp_host = 'smtp.example.com'
    smtp_user = 'email_tmp_ok@guerillmail.info'
    smtp_password = 'tmpTestEmailPwd'
    from_email = 'airflow@example.com'

    #### Create the email content
    msg = MIMEMultipart()
    msg['From'] = from_email
    msg['To'] = to_email
    msg['Subject'] = subject
    msg.attach(MIMEText(message, 'plain'))

    ########## Send the email
    try:
        server = smtplib.SMTP(smtp_host, 587)
        server.starttls()
        server.login(smtp_user, smtp_password)
        server.send_message(msg)
        server.quit()
        print("The email was sent successfully!")
    except Exception as e:
        print(f"Error: unable to send email: {e}")

In [43]:
""" Create the notifier """
class Notifier8(BaseNotifier):
    template_fields = ("message",)

    def __init__(self, message):
        self.message = message

    def notify(self, context):
        """ Send notification message """
        title = f"Task {context['task_instance'].task_id} failed"
        send_my_message(title, self.message)

In [46]:
with DAG(
    dag_id="example_notifier",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    tags=['notifier'],
    on_success_callback=Notifier8(message="Success! Perfect!"),
    on_failure_callback=Notifier8(message="Failure! Oooops!"),
) as dag8:
    task = BashOperator(
        task_id="example_8_task",
        bash_command="exit 1",
        on_success_callback=Notifier8(message="Task Succeeded!"),
    )

In [47]:
dag8

<DAG: example_notifier>