#  How to use Airflow!


Apache Airflow is used as a workflow management platform. It can schedule and monitor different tasks. In the following, I will introduce you to the workings of Airflow. 

## First: How does a function work?

As Airflow primarily is a seuqence of functions, we should understand how they work!

In [1]:
def my_function():
    print("Hello")

In [2]:
my_function()

Hello


This is the simplest form a function can have. You define it with a specific name ("my_function") and close it with ():

Within the indented block, you write what your function needs to do. Im the next example, we will deine a small calculation function ("my_calc"). Here, we specify parameters (num1 and num2) for the function to work. 

In [5]:
def my_calc(num1, num2):
    result = num1 + num2 
    return result

Using specified arguments for each parameter, we can calculate 1+2 or 3+4. The parameters are given either based on their position on using keyword arguments. The keyword arguments have the additional benefit of defining default arguemnts within the function.

In [12]:
# Positional arguments
my_calc(1, 2)

3

In [21]:
# Keyword arguments
my_calc(num1= 1, num2 = 2)

3

In [19]:
# num2 has a default values of 1
# default values always need to be defined after all non-default arguments
def my_calc_default(num1, num2=1):
    result = num1 + num2 
    return result

In [20]:
# Keyword arguments
my_calc_default(num1 = 5)

6

## What is Airflow?

Airflow takes different tasks and orders them using a DAG. DAG stands for Directed Acyclic Graph.

Directed -> One Direction only, e.g. A -> B and not A <-> B

Acyclic -> non repretable, not A -> B -> A


## The Structure
### The difference between original Airflow and TaskFlow

In [22]:
import datetime
import json
import pendulum
from airflow.decorators import dag, task

# Original Structure:
def task1():
    x = 2+2
    return x

def task2(x):
    y = x * x
    return y
    
with DAG(dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1), schedule="@daily",):
    # t1 and t2  are examples of tasks created by instantiating operators
    t1 = PythonOperator(
        task_id="task1",
        python_callable=task1,)

    t2 = PythonOperator(
       task_id="task2",
        python_callable=task2,
        retries=3,
    )

    # Order of DAG: First t1, then t2
    t1 >> t2


# TaskFlow Structure:
@dag(schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,tags=["example"],)
def complete_task_overview():
    # your task 1 settings
    @task(retries=1)
    def task1():
        x = 2+2
        return x
    # your task 2 settings
    @task(retries=2)
    def task2(x):
        y = x * x
        return y
    
    # Create order in procedural way
    # save x as example
    example = task1()
    task2(example)
    
# Execute complete task
complete_task_overview()

<DAG: complete_task_overview>

## A more difficult example! Which is easier to read?

In [14]:
# Original
# Source
# https://docs.astronomer.io/learn/airflow-decorators?tab=traditional#how-to-use-airflow-decorators

import logging
from datetime import datetime
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator

API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"


def _extract_bitcoin_price():
    return requests.get(API).json()["bitcoin"]

def _process_data(ti):
    response = ti.xcom_pull(task_ids="extract_bitcoin_price")
    logging.info(response)
    processed_data = {"usd": response["usd"], "change": response["usd_24h_change"]}
    ti.xcom_push(key="processed_data", value=processed_data)

def _store_data(ti):
    data = ti.xcom_pull(task_ids="process_data", key="processed_data")
    logging.info(f"Store: {data['usd']} with change {data['change']}")

with DAG(
    "classic_dag", schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False):
    extract_bitcoin_price = PythonOperator(
        task_id="extract_bitcoin_price", python_callable=_extract_bitcoin_price)

    process_data = PythonOperator(task_id="process_data", python_callable=_process_data)

    store_data = PythonOperator(task_id="store_data", python_callable=_store_data)

    extract_bitcoin_price >> process_data >> store_data

In [None]:
# TaskFlow
# Source
# https://docs.astronomer.io/learn/airflow-decorators?tab=taskflow#how-to-use-airflow-decorators

"""
Structure:

"""
from datetime import datetime
from typing import Dict
import requests
from airflow.decorators import dag, task

API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"


@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def taskflow():
    @task(task_id="extract", retries=2)
    def extract_bitcoin_price() -> Dict[str, float]:
        return requests.get(API).json()["bitcoin"]

    @task(multiple_outputs=True)
    def process_data(response: Dict[str, float]) -> Dict[str, float]:
        logging.info(response)
        return {"usd": response["usd"], "change": response["usd_24h_change"]}

    @task
    def store_data(data: Dict[str, float]):
        logging.info(f"Store: {data['usd']} with change {data['change']}")

    store_data(process_data(extract_bitcoin_price()))


taskflow()