# **Apache Airflow**

## **1. What is Apache Airflow?**

Apache Airflow is a tool that helps automate tasks by organizing and scheduling them. It allows you to plan and execute tasks automatically according to a specific order.

## **2. DAGs**

<img src="./images/dag_example.png" width="700px" />

DAGs (Directed Acyclic Graphs) in Apache Airflow represent the tasks and their order of execution. Think of it as drawing a line from one point to another on a whiteboard, where each point represents a task.

> Remember, DAGs are graph data structure that we've seen in a previous video (Data structures and algorithms with Python)

### **2.1. Operators**

Operators in Apache Airflow are special tools used to perform different tasks. They are like superpowers that allow you to accomplish specific actions. For example, there are operators for sending emails, running commands, execute python codes, and more.

> So, a task is created by using an operator and an operator is an abstraction that defines what has to be done in a task.

### **2.2. Dependencies**

Tasks are connected by dependencies indicating the order in which they should be executed.

### **2.3. DAG schedule**

It specifies when the DAG should be executed.

### **2.4. Connectors to connect Apache Airflow to Cloud services: AWS, GCP, Azure**

Apache Airflow can connect to cloud services like AWS, GCP, and Azure using connectors or hooks. These connectors are pieces of code that enable Airflow to communicate with the cloud services. By establishing connections and providing the necessary credentials, Airflow can interact with the cloud services and execute tasks on them.

## **3. Run Apache Airflow**

* **This is the DAG we will create with Apache Airflow whose role is to orchestrate it (manage its execution).**

<img src="./images/dag_example.png" width="700px" />

* **For the transformation task with Python, we need the following libraries to installed:**

> * Pandas
> * Openpyxl
> * xlrd

### **3.1. Create a Project's folder**

**In this folder, we will build all our dags.**

> * Ensure that Python is already installed on your computer
> * Ensure that VS Code is already installed
> * Ensure that WSL and Docker are already installed
> * Ensure that in your VS code, you have installed Python extension and WSL extension
> * Ensure that in your VS code, you have installed the microsoft official Docker extension

### **3.2. Create a Dockerfile**

* **Dockerfile: Definition (at the root of the project's folder')**

> A Dockerfile is a text file containing a series of instructions for creating a Docker image. These instructions specify the various steps needed to configure the environment and dependencies required by an application or service inside a Docker container. The Dockerfile is used as input for the docker build command, which builds a Docker image from the instructions in the file.

* **Utility Here**

> The dockerfile will contain the instructions needed to build a custom image using the Airflow image as the base image, in which we'll also create a virtual python environment in which we'll install our dependencies (pandas, openpyxl, xlrd, ...).



* **Content of our Dockerfile**

<img src="./images/dockerfile.png" width="700px" style="border:3px solid white" />

* **Build the custom image using our Dockerfile**

> **For this to work, you must open your Docker Desktop First**

In VS Code:

**Right-Click on the dockerfile and Click On build image**

> **Give a name: let's say mds-airflow:latest**
> > `mds-airflow` is the name
>
> > `latest` is the tag

<img src="./images/dockerfile_exec.png" width="400px" />


**A the end (installation is finished), tap enter in the terminal to exit and close the terminal.**

### **3.3. Create a docker-compose file**

* **docker-compose: general definition**

A Docker Compose file is a YAML configuration file used to define and manage multi-container Docker applications. It allows you to define the services, networks, volumes, and other configurations required to run and orchestrate multiple containers as a single application.

> * **Services:** a service refers to a containerized application or component that is defined in the configuration file. It represents a specific task or functionality within the overall application architecture.
> 
> * **Networks:** a network is a virtual network that allows communication between containers. It isolates and connects containers, enabling them to communicate with each other using container names as hostnames.
> * **Volumes:** s volume is a persistent data storage mechanism in Docker Compose. It provides a way to store and share data between containers or between containers and the host machine. Volumes are used to persist data even when containers are stopped or removed, ensuring data durability and availability.

* **docker-compose: definition in the context of Apache Airflow**

In the context of Apache Airflow, a Docker Compose file can be used to define the various components and dependencies required to run an Airflow deployment. It enables you to specify the Airflow web server, scheduler, worker, and other services, as well as any necessary volumes, networks, and environment variables. With Docker Compose, you can easily define and manage a complete Airflow environment with multiple interconnected containers, simplifying the setup and deployment process.

* **Content of our docker-compose file**

In this, we will do two things:

> * Expose port 8080 on which airflow runs to our localhost network traffic port 8080
>
> * For data persistency, we will create a volume to store data locally and bind the stored local db to a folder in our container

<img src="./images/docker-compose_content.png" width="600px" style="border:3px solid white" />

* **Now, right-click on `docker-compose.yml`, then on `Compose Up`**

### **3.4. Create neccessary folders for Airflow: `dags` and `plugins`**

In Apache Airflow, the dags and plugins folders play important roles in organizing and managing your workflows and custom code.

* **`dags` folder**

> This is the default location where you store your DAG files. Airflow scans this folder to discover and schedule the defined DAGs. Any Python file placed in this folder (or its subdirectories) that contains a DAG definition will be automatically detected by Airflow.

* **`plugins` folder**

> This folder is used to store custom Airflow plugins. Plugins are reusable components that can extend the functionality of Airflow. They can include operators, sensors, hooks, macros, and more. By placing your custom plugin files in the plugins folder, Airflow will automatically load and make them available for use in your DAGs.

Then:

* **Check `airflow.cfg` file to ensure airflow will recognize `dags` and `plugins` folders**

* **In the `dags` folder, create a subfolder named: `custom_modules`**

This will contains our custom python codes (classes, functions, ...) to process data for example. These custom code could be called in our dags.

> **Note:**
> * When Airflow scans the dags directory for DAG files, it imports all Python files it encounters. During the import process, Airflow checks if the file contains a valid DAG definition. If a file defines a DAG using the DAG class from the Airflow library, it is considered a DAG file and will be scheduled and executed accordingly.
>
> * If a Python file within the dags directory does not define a DAG or use the DAG class, it will still be imported and executed during the DAG parsing process. However, it won't be treated as a DAG itself, and its functions or code can be used within DAG tasks or other Python files that are part of the DAG.



## **4. Create our First DAG**

### **4.1. Create a Python file named `weather_dag.py` and import the following libraries**

In [None]:
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.http.sensors.http import HttpSensor
import json
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator


### **4.2. Define the Orchestration parameters for our DAG: default_args**

In [None]:
default_args = {
    'owner': 'mdesafari',  #  owner of the DAG
    'depends_on_past': False, # whether a task depends on the success of its previous runs (True) or not (False)
    'start_date': datetime(2024, 4, 1),  # start date of the DAG. Tasks will only be scheduled after this date.
    'end_date': datetime(2024, 4, 7),  # end date for the DAG
    'email': ['your_email@gmail.com'],  # (optional) list of email addresses to receive notifications
    'email_on_failure': False, # whether email notifications should be sent on task failure (True) or not (False).
    'email_on_retry': False,  # whether email notifications should be sent on task retry after failure (True) or not (False).
    'retries': 2,  # max number of retry attempts in case of task failure
    'retry_delay': timedelta(minutes=2),  # time interval between retry attempts for failed tasks
    'schedule_interval': '0 22 * * *',  # DAG will run at 22:00 (10 PM) UTC every day. (more details below)
    'catchup': False  # wether to execute missed previous tasks during the initial scheduling (or after a change in default_args) of the DAG or not.
}

# cron-like expression '0 22 * * *':
# 0: minutes
# 22: hour
# *: Day of the month and * means all days of the month
# *: Month and * means all month
# *: Day of the week and * means all days of the week
# note: we specified a end_date so the will run while date <= end_date

### **4.3. Define our DAG (in the `weather_dag.py` file)**

In [None]:
dag = DAG(
    dag_id='weather_dag',
    default_args=default_args
)

### **4.4. Sign up to OpenWeatherMap to get an API key (needed to access data) + Select an endpoint**

> * **Website:** https://openweathermap.org/
>
> * **Create an account:** click on `Sign in` > Create an account > Follow the instructions
>
> * **Valide your email address**
>
> * **Click on your accounr > My API keys** then copy the generated API key
>
> **Note:** create your own. I will desactivate mine to encourage you to make your own.
>
> * **Now, click on API on the tool bar** > Scroll down to `Current & Forecast weather data collection`
>
> * **Click on API doc** > Scroll down to JSON format to see an example of content > 
>
> * **Scroll down again to API call**
>
> <img src="./images/api_call.png" width="500px" />

<img src="./images/dag_example.png" width="600px" />

### **4.4. Create ours Tasks**

#### **Task 1: Check if the weather API is available**

In [None]:
api_key = ''
city_name = 'Paris'
endpoint = f'/data/2.5/weather?q={city_name}&appid={api_key}'

is_weather_api_ready = HttpSensor(
    task_id ='is_weather_api_ready',  # unique identifier for the task
    http_conn_id='weathermap_api',  # Airflow connection ID, which contains the configuration details for the HTTP connection to the weather API.
    endpoint=endpoint,  # API endpoint path to query weather data for Portland, along with the API key (APPID) needed for authentication.
    dag=dag # assigns the task to our DAG named 'dag' (and defined above)
)

**Create and configure the http_conn_id for task 1 in Airflow to access to OpenWeather API**

* **Go to Airflow on your web browser:** localhost:8080 > sign in

> * **username:** admin
> * **password:** find it in airflow > standalone_admin_password.txt

* **Click on Admin** > Connections > Add a new connection

> * **Connection id:** `weathermap_api`
>
> * **Connection type:** `HTTP`
>
> * **Host:** `https://api.openweathermap.org`
>
> <img src="./images/api_call.png" width="500px" />
>
> * **Save**

#### **Task 2: Extract Weather Data From OpenWeather API**

In [None]:
extract_weather_data = SimpleHttpOperator(  # operator used to make an HTTP request to extract weather data
    task_id = 'extract_weather_data',  # unique identifier for this task in the DAG
    http_conn_id = 'weathermap_api',  # id of the HTTP connection to be used for the request (we set it in airflow)
    endpoint=endpoint,  # URL endpoint to which the HTTP request will be made
    method = 'GET',  # The operator will perform a GET request to retrieve weather data
    response_filter= lambda r: json.loads(r.text),  # a lambda function that converts the response text into a JSON object
    log_response=True,  # nables logging of the response received. Response details will be logged in the Airflow task logs
    dag=dag
)


#### **Task 3: Transform and Load the Weather Data**

* **In the `custom_modules` folder, create a Python file named: `__init__.py`**

This will allow airflow to recognize `custom_modules` as a module folder. This will allows us to import functions from this folder. It's mandatory.

> Leave content empty

* **In the `custom_modules` folder, create a Python file named: `transform_load_weather_funcs.py`**

Then, create (copy/paste) the following transformation and loading code

In [None]:
import pandas as pd
import os
import errno
from datetime import timedelta, datetime

def kelvin_to_degree_celsius(temp_in_kelvin):
    return temp_in_kelvin - 273.15


def transform_load_data(task_instance):
    """
    params:
        task_instance: represents the state of the previous task.
            It provides access to data of the previous task using XCom (Cross-communication).
    """
    # use xcom_pull to get data of the previous task whose id is 'extract_weather_data'
    data = task_instance.xcom_pull(task_ids="extract_weather_data")

    # extracts the city name from the data.
    city = data["name"]

    # extracts the weather description from the data
    weather_description = data["weather"][0]['description']

    # converts the temperature in Kelvin to degree Celsius
    temp_celsius = kelvin_to_degree_celsius(data["main"]["temp"])

    # converts the "feels like" temperature in Kelvin to degree Celsius
    feels_like_celsius = kelvin_to_degree_celsius(data["main"]["feels_like"])

    # converts the minimum temperature in Kelvin to degree Celsius
    min_temp_celsius = kelvin_to_degree_celsius(data["main"]["temp_min"])

    # converts the maximum temperature in Kelvin to degree Celsius
    max_temp_celsius = kelvin_to_degree_celsius(data["main"]["temp_max"])

    # extracts the atmospheric pressure from the data
    pressure = data["main"]["pressure"]

    # extracts the humidity from the data
    humidity = data["main"]["humidity"]

    # extracts the wind speed from the data
    wind_speed = data["wind"]["speed"]

    # calculates the timestamp of the weather record, sunrise time, and sunset time
    time_of_record = datetime.fromtimestamp(data['dt'] + data['timezone'])
    sunrise_time = datetime.fromtimestamp(data['sys']['sunrise'] + data['timezone'])
    sunset_time = datetime.fromtimestamp(data['sys']['sunset'] + data['timezone'])

    # creates a dictionary transformed_data containing the transformed data
    transformed_data = {"City": city,
                        "Description": weather_description,
                        "Temperature (F)": temp_celsius,
                        "Feels Like (F)": feels_like_celsius,
                        "Minimun Temp (F)":min_temp_celsius,
                        "Maximum Temp (F)": max_temp_celsius,
                        "Pressure": pressure,
                        "Humidty": humidity,
                        "Wind Speed": wind_speed,
                        "Time of Record": time_of_record,
                        "Sunrise (Local Time)":sunrise_time,
                        "Sunset (Local Time)": sunset_time                        
                        }
    
    # creates a list transformed_data_list containing the transformed data dictionary
    transformed_data_list = [transformed_data]

    # converts the transformed data list into a pandas DataFrame.
    df_data = pd.DataFrame(transformed_data_list)

    # get the current date components
    now = pd.Timestamp.now()
    year = now.year
    month = now.month
    day = now.day

    # save the DataFrame as a CSV file with a dynamically generated filename
    # to a local storage (in our airflow directory).
    # In /opt/airflow we will create a directrory named 'weather' and a subdirectory with the city's name
    data_dir = f'/opt/airflow/weather/{city}'

    # create the directory path if it does not exist
    try:
        os.makedirs(data_dir)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise
    
    # saving path
    path_save = f'{data_dir}/weather_{year}_{month}_{day}.csv'

    # save the dataframe (in csv format)
    df_data.to_csv(path_save, index=False)

* **Import the function `transform_load_data` in our dag file**

In [None]:
from custom_modules.transform_load_weather_funcs import transform_load_data

* **Create the transform and load task in our dag file**

In [None]:
transform_load_weather_data = PythonOperator(
    task_id= 'transform_load_weather_data',
    python_callable=transform_load_data,
    dag=dag
)

#### **Define the tasks dependencies (in the dag file)**

In [None]:
is_weather_api_ready >> extract_weather_data >> transform_load_weather_data

## **5. Create our Second DAG: improvement of the first one**

<img src="./images/improve/new_dag_schema.png" width="700px" />

* **Instead of storing the transformed weather data in the local system, we're going to store it in a PostgreSQL database. To do this, we need to update / modify :**

> * dockerfile
> * docker-compose.yml
> * transform_load_data function

* **We also need to create our PostgreSQL database.**

> * Database Name: weather

* **Then, create a task to create a table (`paris`) if it does not exist**

* **Create a Python function to insert data**

* **Create a task that execute this insertion function**

* **Re-build the dependencies**

### **5.1. Modifications: dockerfile, docker-compose.yml, and transform_load_data function**

* **dockerfile**

<img src="./images/improve/dockerfile.png" width="600px"/>

* **docker-compose.yml**

<img src="./images/improve/docker-compose.png" width="400px"/>

* **transform_load_data**

> I created a new Python file in `custom_modules` folder named `tl_weather_postgres.py` and its content is the one below:
>
> > * We have our previous functions: (1) kelvin_to_degree_celsius (2) transform_load_data
> > `transform_load_data` has been modified as below
> >
> > * I also added a new function named `insert_data_to_postgres_db` to insert data into our PostgreSQL database (see below)

In [None]:
import pandas as pd
import os
import errno
from datetime import timedelta, datetime
from airflow.providers.postgres.hooks.postgres import PostgresHook


def kelvin_to_degree_celsius(temp_in_kelvin):
    return temp_in_kelvin - 273.15

# we modified the function name from transform_load_weather_data to transform_weather_data
# so, modify also the corresponding dag id and name
def transform_data(task_instance):
    data = task_instance.xcom_pull(task_ids="extract_weather_data")
    city = data["name"]
    weather_description = data["weather"][0]['description']
    temp_celsius = kelvin_to_degree_celsius(data["main"]["temp"])
    time_of_record = datetime.fromtimestamp(data['dt'] + data['timezone'])

    transformed_data = {
        "city": city,
        "description": weather_description,
        "temperature": temp_celsius,
        "time": time_of_record 
    }
    
    return transformed_data

# here also, the name of the previous dag is transform_weather_data instead of transform_load_weather_data
def insert_data_to_postgres_db(task_instance):
    transformed_data = task_instance.xcom_pull(task_ids='transform_weather_data')
    insert_sql = "INSERT INTO paris (city, description, temperature, time) VALUES (%s, %s, %s, %s)"
    pg_hook = PostgresHook(postgres_conn_id='postgres_id')
    pg_hook.run(
        insert_sql,
        parameters=(
            transformed_data['city'],
            transformed_data['description'],
            transformed_data['temperature'],
            transformed_data['time']
        )
    )


### **5.2. Create our PostgreSQL database named `weather`**

* Open pg Admin
* Authentify (your password)
* Create the db: `weather`


### **5.2. Create a task to create a table (`paris`) if it does not exist**

In [None]:
# import PostgresOperator
from airflow.operators.postgres_operator import PostgresOperator

# also, do not forget to update path to our custom functions
# tl_weather_postgres instead of transform_load_weather_funcs (as in the previous video)
from custom_modules.tl_weather_postgres import transform_load_data, insert_data_to_postgres_db


create_weather_table = PostgresOperator(
    task_id='create_weather_table',
    postgres_conn_id='postgres_id',  # id of our postgre connector that we will create once airflow is launched
    sql='''CREATE TABLE IF NOT EXISTS paris (
        city VARCHAR(255),
        description VARCHAR(255),
        temperature FLOAT,
        time TIMESTAMP
    );''',
    dag=dag
)

### **5.3. Create a Python function to insert data**

This have already been done when modifying `transform_load_data` function. See `section 5.1`


### **5.4. Create a task that execute this insertion function**

In [None]:
# insert data in the PostgreSQL db named `weather`, in its table named `paris` (task)
insert_data_to_postgres = PythonOperator(
    task_id='insert_data_to_postgres',
    python_callable=insert_data_to_postgres_db,
    provide_context=True,
    dag=dag
)

### **5.5. Re-build the dependencies**

In [None]:
# extract_weather_data is executed after is_weather_api_ready
# transform_load_weather_data is executed after extract_weather_data
is_weather_api_ready >> extract_weather_data >> transform_weather_data

# to execute create_weather_table, is_weather_api_ready must be completed
create_weather_table << is_weather_api_ready

# to execute insert_data_to_postgres, transform_load_weather_data and create_weather_table 
# must be completed
insert_data_to_postgres << transform_weather_data
insert_data_to_postgres << create_weather_table

<img src="./images/improve/new_dag.png" width="700px" />

* **Here is the complete dag content**

In [None]:
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.http.sensors.http import HttpSensor
import json
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from custom_modules.tl_weather_postgres import transform_data, insert_data_to_postgres_db


default_args = {
    'owner': 'mdesafari',
    'depends_on_past': False,
    'start_date': datetime(2024, 4, 1),
    'end_date': datetime(2024, 4, 7),
    'email': ['your_email@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
    'schedule_interval': '0 22 * * *',
    'catchup': False
}

# 1. CREATE THE DAG DEFINITION
dag = DAG(
    dag_id='weather_dag_postgres',
    default_args=default_args
)

# 2. CREATE TASKS
# 2.1. Task 1: Check if the weather API is ready
api_key = ''
city_name = 'Paris'
endpoint = f'/data/2.5/weather?q={city_name}&appid={api_key}'

is_weather_api_ready = HttpSensor(
    task_id ='is_weather_api_ready',
    http_conn_id='weathermap_api',
    endpoint=endpoint,
    dag=dag
)

# 2.2. Task 2: Extract weather data from the API
extract_weather_data = SimpleHttpOperator(
    task_id = 'extract_weather_data',
    http_conn_id = 'weathermap_api',
    endpoint=endpoint,
    method = 'GET',
    response_filter= lambda r: json.loads(r.text),
    log_response=True,
    dag=dag
)

# 2.3. Task 3: transform and load weather data
transform_weather_data = PythonOperator(
    task_id= 'transform_weather_data',
    python_callable=transform_data,
    dag=dag
)


# 3. creata postgres tasks
# 3.1. create table
create_weather_table = PostgresOperator(
    task_id='create_weather_table',
    postgres_conn_id='postgres_id',
    sql='''CREATE TABLE IF NOT EXISTS paris (
        city VARCHAR(255),
        description VARCHAR(255),
        temperature FLOAT,
        time TIMESTAMP
    );''',
    dag=dag
)


# 3.2. insert data
insert_data_to_postgres = PythonOperator(
    task_id='insert_data_to_postgres',
    python_callable=insert_data_to_postgres_db,
    provide_context=True,
    dag=dag
)


# 4. TASKS DEPENDENCIES
is_weather_api_ready >> extract_weather_data >> transform_weather_data
create_weather_table << is_weather_api_ready
insert_data_to_postgres << transform_weather_data
insert_data_to_postgres << create_weather_table


### **Re-build the image (because dockerfile updated) before Compose Up**

### **5.6. Execution**

> * **Create the PostegreSQL connection**
>
> > * **Connection Id:** `postgres_id`
> >
> > * **Connection Type:** `Postgres`
> >
> > * **Host:** `host.docker.internal`
> >
> > * **Database:** `weather`
> >
> > * **Login:** `postgres`
> >
> > * **Password:** `your_postgres_db_password_here`
> >
> > * **Port:** `5432`
>
> * **You will need to re-create the connection to OpenWeatherMap API since we built a new Image and Composing Up wich changes our initial configuration**
>
> > * **Connection Id:** `weathermap_api`
> >
> > * **Connection Type:** `HTTP`
> >
> > * **Host:** `https://api.openweathermap.org`
>
> * **Then Compose Down before Composing Up again**
>
> * **Trigger manually the DAG to see if it works + Check if your PostgreSQL db (`weather`) got the data**