After completing this lab you will be able to:

- Start Apache Airflow.
- Open the Airflow UI in a browser.
- List all the DAGs.
- List the tasks in a DAG.
- Explore a DAG in the UI

**Exercise 1** - Start Apache Airflow
Open a new terminal by clicking on the menu bar and selecting Terminal->New Terminal

Run the commands below on the newly opened terminal. (You can copy the code by clicking on the little copy button on the bottom right of the codeblock below and paste it wherever you wish.)

Run the command below in the terminal to start Apache Airflow.

In [None]:
start_airflow

Please be patient, it may take upto 5 minutes for airflow to get started.

**Exercise 2** - Open the Airflow Web UI

Copy the Web-UI URL and paste it on a new browser tab. You can also click on the URL by holding the control key (Command key in case of a Mac).

You can Unpause/Pause a DAG using the Unpause/Pause toggle button.

**Exercise 3** - List all DAGs

Apache airflow gives us some handy command line options to work with.

Run the command below in the terminal to list out all the existing DAGs:

In [None]:
airflow dags list

**Exercise 4** - List tasks in a DAG
Run the command below in the terminal to list out all the tasks in the DAG named example_bash_operator.

In [None]:
airflow tasks list example_bash_operator

Also:

Run the command below in the terminal to unpause a DAG:

In [None]:
airflow dags unpause tutorial

Run the command below in the terminal to pause a DAG:

In [None]:
airflow dags pause tutorial

# Create a DAG for Apache Airflow

**Objectives**
After completing this lab you will be able to:

- Explore the anatomy of a DAG.
- Create a DAG.
- Submit a DAG.

Exercise 3 - Explore the anatomy of a DAG
An Apache Airflow DAG is a python program. It consists of these logical blocks.

- Imports
- DAG Arguments
- DAG Definition
- Task Definitions
- Task Pipeline
A typical imports block looks like this.

**A typical imports block looks like this:**

In [None]:
# import the libraries

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago

**A typical DAG Arguments block looks like this:**

In [None]:
#defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Ramesh Sannareddy',
    'start_date': days_ago(0),
    'email': ['ramesh@somemail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

DAG arguments are like settings for the DAG.

The above settings mention:

- the owner name,
- when this DAG should run from: days_age(0) means today,
- the email address where the alerts are sent to,
- whether alert must be sent on failure,
- whether alert must be sent on retry,
- the number of retries in case of failure, and
- the time delay between retries.

**A typical DAG definition block looks like this:**

In [None]:
# define the DAG
dag = DAG(
    dag_id='sample-etl-dag',
    default_args=default_args,
    description='Sample ETL DAG using Bash',
    schedule_interval=timedelta(days=1),
)

Here we are creating a variable named dag by instantiating the DAG class with the following parameters.

- sample-etl-dag is the ID of the DAG. This is what you see on the web console.

- We are passing the dictionary default_args, in which all the defaults are defined.

- description helps us in understanding what this DAG does.

- schedule_interval tells us how frequently this DAG runs. In this case every day. (days=1).

**A typical task definitions block looks like this:**

In [None]:
# define the tasks

# define the first task named extract
extract = BashOperator(
    task_id='extract',
    bash_command='echo "extract"',
    dag=dag,
)

# define the second task named transform
transform = BashOperator(
    task_id='transform',
    bash_command='echo "transform"',
    dag=dag,
)

# define the third task named load

load = BashOperator(
    task_id='load',
    bash_command='echo "load"',
    dag=dag,
)

A task is defined using:

- A task_id which is a string and helps in identifying the task.
- What bash command it represents.
- Which dag this task belongs to.

**A typical task pipeline block looks like this:**

In [None]:
# task pipeline
extract >> transform >> load

Task pipeline helps us to organize the order of tasks.

Here the task **extract** must run first, followed by **transform**, followed by the task **load**.

# Exercise 4 - Create a DAG

Let us create a DAG that runs daily, and extracts user information from datafile file, transforms it, and loads it into a file.

This DAG has two tasks extract that extracts fields from datafile file and transform_and_load that transforms and loads data into a file.

Create a new file 
- by choosing File->New File and 
- name it my_first_dag.py. 
- Copy the code above and paste it into my_first_dag.py

In [None]:
# import the libraries

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago

#defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Ramesh Sannareddy',
    'start_date': days_ago(0),
    'email': ['ramesh@somemail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# defining the DAG

# define the DAG
dag = DAG(
    'my-first-dag',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)

# define the tasks

# define the first task

extract = BashOperator(
    task_id='extract',
    bash_command='cut -d":" -f1,3,6 datafile > /home/project/airflow/dags/extracted-data.txt',
    dag=dag,
)

# define the second task
transform_and_load = BashOperator(
    task_id='transform',
    bash_command='tr ":" "," < /home/project/airflow/dags/extracted-data.txt > /home/project/airflow/dags/transformed-data.csv',
    dag=dag,
)

# task pipeline
extract >> transform_and_load

**Exercise 5 - Submit a DAG**

Submitting a DAG is as simple as copying the DAG python file into dags folder in the AIRFLOW_HOME directory.

- Open a terminal and run the command below to submit the DAG that was created in the previous exercise.

Note: While submitting the dag that was created in the previous exercise, use sudo in the terminal before the command used to submit the dag.

In [None]:
cp my_first_dag.py $AIRFLOW_HOME/dags

Verify that our DAG actually got submitted.

Run the command below to list out all the existing DAGs.

In [None]:
#Run the command below to list out all the existing DAGs.

airflow dags list

In [None]:
# Verify that my-first-dag is a part of the output.

airflow dags list|grep "my-first-dag"

In [None]:
#Run the command below to list out all the tasks in my-first-dag

airflow tasks list my-first-dag

In [None]:
airflow dags unpause tutorial

# Problem:

Write a DAG named ETL_Server_Access_Log_Processing.

- Task 1: Create the imports block.

- Task 2: Create the DAG Arguments block. You can use the default settings

- Task 3: Create the DAG definition block. The DAG should run daily.

- Task 4: Create the download task.

download task must download the server access log file which is available at the URL: https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt

- Task 5: Create the extract task.

The server access log file contains these fields.

    a. timestamp - TIMESTAMP

    b. latitude - float

    c. longitude - float

    d. visitorid - char(37)

    e. accessed_from_mobile - boolean

    f. browser_code - int

The extract task must extract the fields timestamp and visitorid.

- Task 6: Create the transform task.

The transform task must capitalize the visitorid.

- Task 7: Create the load task.

The load task must compress the extracted and transformed data.

- Task 8: Create the task pipeline block.

The pipeline block should schedule the task in the order listed below:

-    download
-    extract
-    transform
-    load

- Task 10: Submit the DAG.

- Task 11. Verify if the DAG is submitted

# Solution

In [None]:
# import the libraries

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago



#defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'Ramesh Sannareddy',
    'start_date': days_ago(0),
    'email': ['ramesh@somemail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}




# Defining the DAG

dag = DAG(
    'ETL_Server_Access_Log_Processing',
    default_args=default_args,
    description='My first DAG',
    schedule_interval=timedelta(days=1),
)




# define the tasks

# Define the task 'download'

download = BashOperator(
    task_id='download',
    bash_command='wget "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt"',
    dag=dag,
)



# Define the task 'extract'

extract = BashOperator(
    task_id='extract',
    bash_command='cut -f1,4 -d"#" web-server-access-log.txt > /home/project/airflow/dags/extracted.txt',
    dag=dag,
)



# Define the task 'transform'

transform = BashOperator(
    task_id='transform',
    bash_command='tr "[a-z]" "[A-Z]" < /home/project/airflow/dags/extracted.txt > /home/project/airflow/dags/capitalized.txt',
    dag=dag,
)




# define the task 'load'

load = BashOperator(
    task_id='load',
    bash_command='zip log.zip capitalized.txt' ,
    dag=dag,
)



# task pipeline

download >> extract >> transform >> load




