```
From: https://github.com/ksatola
Version: 0.0.1

TODOs
1. 

```

# Apache Airflow

## Table of contents

- [Jupyter Lab and Python Environment Setup](#toc00)
- [Airflow code examples](#toc01)
- [Python example](#toc02)

Resources:
- https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
- https://airflow.apache.org/docs/apache-airflow/stable/usage-cli.html
- https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
- https://towardsdatascience.com/master-apache-airflow-how-to-install-and-setup-the-environment-in-10-minutes-61dad52d5239

---
<a id='toc00'></a>

## Jupyter Lab and Python Environment Setup
```
# In WSL Terminal
IMAGE_NAME='ksatola/ubuntu-python-dev-base'
CONTAINER_NAME='ubuntu-python-dev-base-apache-airflow'

docker run -d -t -P \
    --name $CONTAINER_NAME \
    --mount src='/home/ksatola/git',target='/root/git',type=bind \
    $IMAGE_NAME


# Connect to the container with VSC with Remote Explorer

# Dependencies

# -----
# Airflow locally
# https://airflow.apache.org/docs/apache-airflow/stable/start/local.html

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.1.4

PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
echo $PYTHON_VERSION
# For example: 3.9

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.1.4/constraints-3.6.txt

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname FIRST_NAME \
    --lastname LAST_NAME \
    --role Admin \
    --email admin@example.org
password: admin

# start the web server, default port is 8080
airflow webserver --port 8085

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler

# visit localhost:8085 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page

#Upon running these commands, Airflow will create the $AIRFLOW_HOME folder and create the “airflow.cfg” file with defaults that will get you going fast. 
# You can inspect the file either in $AIRFLOW_HOME/airflow.cfg, or through the UI in the Admin->Configuration menu. The PID file for the webserver will be stored in $AIRFLOW_HOME/airflow-webserver.pid or in /run/airflow/webserver.pid if started by systemd.

airflow dags list

# run your first task instance
airflow tasks run example_bash_operator runme_0 2015-01-01

# run a backfill over 2 days
airflow dags backfill example_bash_operator \
    --start-date 2015-01-01 \
    --end-date 2015-01-02
# -----


pip install jupyterlab

# Run in the container
jupyter lab --no-browser --allow-root

```

---
<a id='toc01'></a>

## DAG

In [None]:
# Import the DAG object
from datetime import datetime
from airflow.models import DAG

# Define the default_args dictionary
default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 1, 14),
  'retries': 2
}

# Instantiate the DAG object
etl_dag = DAG('example_etl', default_args=default_args)

## Bash Workflow Operator

In [None]:
# Import the BashOperator
from airflow.operators.bash import BashOperator

# Define the BashOperator 
cleanup = BashOperator(
    task_id='cleanup_task',
    # Define the bash_command
    bash_command='cleanup.sh',
    # Add the task to the dag
    dag=etl_dag
)

In [None]:
consolidate = BashOperator(
    task_id='consolidate_task',
    # Define the bash_command
    bash_command='consolidate.sh',
    # Add the task to the dag
    dag=etl_dag
)

In [None]:
pull_sales = BashOperator(
    task_id='pullsales_task',
    bash_command='wget https://salestracking/latestinfo?json',
    dag=etl_dag
)

In [None]:
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push.sh',
    dag=etl_dag
)

## Tasks Order (upstream/prior, downstream/following)

In [None]:
# Set pull_sales to run prior to cleanup
pull_sales >> cleanup

# Configure consolidate to run after cleanup
cleanup >> consolidate

# Set push_data to run last
consolidate >> push_data

## Python Workflow Operator

In [None]:
from airflow.operators.python import PythonOperator

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=etl_dag
)

In [None]:
# Import the Operator
from airflow.operators.email import EmailOperator

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=etl_dag
)

# Set the order of tasks
pull_file_task >> email_manager_task

## DAGs Scheduling

<img src="images/cron-job-format-1.png" alt="" style="width: 600px;"/>

See: [crontab.guru](https://crontab.guru/#0_*_*_*_*)

In [None]:
from datetime import timedelta

# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

## Sensor Operator (Conditions)

In [None]:
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *"
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='poke',
    dag=report_dag
)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task

## Executors

---
<a id='toc02'></a>

## Python example
Run python function in a sequence

In [None]:
def execute_task(desc):
    print(desc)

In [None]:
default_args = {
  'owner': 'admin',
  'start_date': datetime(2021, 9, 26),
  'retries': 2
}

test_dag = DAG('test_dag', default_args=default_args)

In [None]:
from airflow.operators.python import PythonOperator

task1 = PythonOperator(
    task_id='t1',
    python_callable=execute_task,
    op_kwargs={'desc':'Task 1 executed first'},
    dag=test_dag
)

task2 = PythonOperator(
    task_id='t2',
    python_callable=execute_task,
    op_kwargs={'desc':'Task 2 executed as 3rd'},
    dag=test_dag
)

task3 = PythonOperator(
    task_id='t3',
    python_callable=execute_task,
    op_kwargs={'desc':'Task 3 executed as 2nd'},
    dag=test_dag
)

In [None]:
task1 >> task3 >> task2

In [None]:
!mkdir ~/airflow/dags

In [None]:
_test_dag_file = f'~/airflow/dags/test_dag.py'

In [None]:
%%writefile {_test_dag_file}

# https://github.com/hgrif/airflow-tutorial
# Create a DAG file in AIRFLOW_HOME/dags
# Create test_dag.py

import datetime as dt

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def print_world():
    print('world')


default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2021, 9, 26),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}


with DAG('test_dag',
         default_args=default_args,
         schedule_interval='0 * * * *',
         ) as dag:

    print_hello = BashOperator(task_id='print_hello',
                               bash_command='echo "hello"')
    sleep = BashOperator(task_id='sleep',
                         bash_command='sleep 5')
    print_world = PythonOperator(task_id='print_world',
                                 python_callable=print_world)


print_hello >> sleep >> print_world


```
# Run in container's terminal

cd ~/airflow/dags
python test_dag.py

# Test the DAG
airflow tasks test test_dag print_world 2021-09-26

# Run
airflow scheduler

# Check in the web UI
```