# Stack Overflow Data Pipeline

In this project, I will create a data pipeline in the Cloud using Apache Airflow.

The dataset I will use for this project is an archive of Stack Overflow content.

### Architecture

![flow_of_data](../screenshots/pipeline_data_flow.png)

This diagram illustrates the flow of data from the source database (AWS RDS - Source Database), through to the data processing step (AWS EC2 - Apache Airflow), and finally, insertion into the analytical database (AWS RDS - Analytical Database).

## EC2 Setup

First, I will setup an EC2 instance on AWS.

EC2 is a web service which provides computing capacity in the Cloud. 

I will configure Airflow to run inside my EC2 instance, so that the pipeline runs in the Cloud, independent from my local machine.

First, I log into AWS and create the instance on EC2:

![ec2_instance](../screenshots/ec2_instance.png)


Next, I connect to the EC2 instance via the terminal using a secure shell (SSH) connection.

![ec2_connect](../screenshots/ec2_ssh_connection.png)

To connect, I need to use the `pem` file which was generated when the instance was created.

I save this into my current directory and then run the following command:

In [None]:
ssh -i "batching-project.pem" ec2-user@ec2-18-130-230-199.eu-west-2.compute.amazonaws.com

![ec2_terminal](../screenshots/ec2_terminal.png)

## Apache Airflow

With a connection to the EC2 instance established, I can now install Apache Airflow inside it.

I first create a venv in the EC2 and then install my project dependencies inside that using `pip`.

With all my dependencies installed, I can now run the Airflow server and scheduler inside the EC2, using the following commands:

In [None]:
airflow db init

airflow scheduler

airflow webserver -p 8080

I use `tmux` to run multiple panels in my terminal.

This way, I can easily switch between the webserver, scheduler, EC2 terminal and local terminal.

![airflow_ec2](../screenshots/airflow_running_ec2.png)

Now that the webserver is running, I can visit it using the public EC2 IP address and the Airflow port:

http://18.130.230.199:8080/

To log in, I must first create a user in the EC2 terminal.

In [None]:
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --password example \
    --email spiderman@superhero.org

I am now logged into Airflow running on an EC2 instance.

![airflow](../screenshots/airflow.png)

## Connecting to RDS

With Airflow now set up, I can connect to the two databases held on the AWS Relational Database Service (RDS).

One database contains the raw Stack Overflow data (source), while the other is the analyical database, a currently empty database to which I will load transformed data (target).

To do this, I create two new connections in Airflow, using the relevant RDS credentials.

![rds_connection](../screenshots/rds_connection.png)

## DAGs

With everything connected up, I can now create my first DAG.

The goal of which is as follows:

- Your company wants to ensure that all posts loaded into the target database have a body field that is **not empty**. This is to ensure data quality and consistency in the target database. 
- Additionally, the company wants to avoid reprocessing the same data every time the ETL process runs.
- They are interested in these fields: `id`, `title`, `body`, `owner_user_id`, and `creation_date`.
- The DAG should run every 15 minutes.

To create new files inside my EC2, I opt to connect it to VSCode using a remote SSH extension.

From here, I can easily access the EC2 and create new files.

![vscode](../screenshots/vscode_ssh.png)

## DAG Breakdown

In this section, I will break down the DAG explaining what is happening in each part. 

The complete file can be viewed [here](../airflow/dags/stackoverflow_dag.py).

#### Modules 

First, I import the necessary modules:

- A combination of Postgres hook and Python operator to access the databases.
- `Xcoms` and `Variable` to store information in Airflow, so that it can be used by separate tasks.
- `logging` to send info to the Airflow log, which is useful for debugging.

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import logging
import pandas as pd

#### Extract Function

Next, I created a Python function to extract data from the database and then push it to `xcoms`.

This particular dataset has 55 million rows, which is far too much to store in `xcoms`. In addition, trying to move this amount of data at once would crash the EC2 instance, due to memory exhaustion.

Therefore, I will process the data in batches of 1000 rows, every 15 minutes. 

In this case, I use the unique `id` primary key to mark progress in the dataset. Ids are put into ascending order, and the DAG extracts the next 1000 ids based on the id of the last uploaded item. 

By systematically going through the data like this, I ensure that the pipeline is idempotent.

In [None]:
def extract(**kwargs):
    # postgres hook to connect to the RDS database, using connection set up earlier
    # in Airflow
    src_db = PostgresHook(postgres_conn_id="stackoverflow_source_db")
    src_conn = src_db.get_conn()

    # store the id of the last loaded item to Variable
    # this will be used to instruct the DAG from where to extract the next
    # 1000 rows of data, similar to a bookmark
    last_loaded_id = Variable.get('last_loaded_id', default_var=0)
    logging.info(f"Extract function: last_loaded_id from Variable is {last_loaded_id}")
    last_loaded_id = int(last_loaded_id)

    # extraction is limited to 1000 rows, data processed in batches
    df = pd.read_sql(
        f'''
        SELECT id, title, body, owner_user_id, creation_date
        FROM posts
        WHERE body IS NOT NULL
        AND owner_user_id IS NOT NULL
        AND id > {last_loaded_id}
        ORDER BY id ASC
        LIMIT 1000;
        ''',
        src_conn
    )

    # data is then pushed to xcoms, ready for use in the next task
    kwargs['ti'].xcom_push(key='dataset', value=df.to_json())
    logging.info(f"Extract function: pushing dataset to Xcom")

#### Load Function

The load function creates a `posts` table in the analytical database.

It then pulls the data stored in `xcoms` and insert it into this table.

The `last_loaded_id` is updated in `Variable` ready for the next DAG run.

In [None]:
def load(**kwargs):
    # hook used to connect to analytical database
    target_db = PostgresHook(postgres_conn_id="analytical_db_rds")

    # posts table created in the analytical db for filtered posts
    create_posts_table = '''
    CREATE TABLE IF NOT EXISTS posts (
    id INT PRIMARY KEY,
    title VARCHAR,
    body VARCHAR,
    owner_user_id INT,
    creation_date TIMESTAMP
    );
    '''

    # SQL to insert data into the analytical db
    # ON CONFLICT clause ensures there will be no duplicates in the dataset
    load_post_data = '''
    INSERT INTO posts (id, title, body, owner_user_id, creation_date)
    VALUES (%s, %s, %s, %s, %s)
    ON CONFLICT (id) DO NOTHING;
    '''

    # pull the data from xcoms and load into a dataframe
    df = pd.read_json(kwargs['ti'].xcom_pull(key='dataset'))
    logging.info(f"Load function: pulled dataset from Xcom. DataFrame shape is {df.shape}")

    df = df[['id', 'title', 'body', 'owner_user_id', 'creation_date']]
    # cast 'creation_date' to a datetime object ready for load
    # as returned as BigInt datatype from xcoms
    df['creation_date'] = pd.to_datetime(df['creation_date'], unit='ms')

    # open connection with database and insert data
    with target_db.get_conn() as conn:
            with conn.cursor() as cursor:
                cursor.execute(create_posts_table)
                for row in df.itertuples():
                    data = row[1:]
                    logging.info(f'Loading data: {data}')
                    cursor.execute(load_post_data, data)
                conn.commit()

    # update last_loaded_id Variable for next DAG run
    max_id = int(df['id'].max() or 0)
    Variable.set('last_loaded_id', max_id)
    logging.info(f"Load function: last_loaded_id pushed to Variable is {max_id}")

<hr>

## Screenshots of Pipeline in Action

Successful run of extract and load tasks.

![dag_graph](../screenshots/dag_graph.png)

Log from `load_task` showing data being pulled from `xcoms` and inserted into the analytical database. 

![airflow_log](../screenshots/airflow_log.png)

Overview of the `stackoverflow_dag`. It took quite a few runs before I got it working!

![dag_overview](../screenshots/dag_overview.png)

The analytical database shown in TablePlus shows that data is successfully being filtered and inserted, in batches of 1,000.

![table_plus](../screenshots/table_plus.png)