## TUTORIAL NOTEBOOK 
# HOW TO BUILD AN ETL DATA PIPELINE HOSTED ON AMAZON REDSHIFT WITH APACHE AIRFLOW 


### by Tran Nguyen

## Table of Contents
- [1. Introduction](#introduction)
- [2. Practice - process of building a data pipeline hosted on Amazon Redshift using Apache Airflow](#process)
    + [2.1. Build and launch a Redshift cluster using IaC](#launcher)
    + [2.2. Set up credentials an connection to Redshift cluster for Apache Airflow](#airflow_connection)
    + [2.3. Create and run an Airflow ETL DAG](#run-dag)
    + [2.4. Clean up the resources](#clean-up)
- [3. Conclusions](#conclusions)

<a id='introduction'></a>
## 1. INTRODUCTION

### 1.1. AMAZON REDSHIFT

- Amazon Redshift is a cloud-managed, column-oriented, massively parallel processing (MPP) database. Redshift is considered as a modified postgresql database, in which, table is partitioned up into many pieces and distributed across slices in different machines. 
- Some good resources for learning Redshift:
    + https://aws.amazon.com/blogs/big-data/top-8-best-practices-for-high-performance-etl-processing-using-amazon-redshift/

    + https://aws.amazon.com/blogs/big-data/how-i-built-a-data-warehouse-using-amazon-redshift-and-aws-services-in-record-time/

    + https://panoply.io/data-warehouse-guide/redshift-etl/

    + https://d1.awsstatic.com/whitepapers/enterprise-data-warehousing-on-aws.pdf

### 1.2. AIRFLOW COMPONENTS

- Apache Airflow is one of the best workflow management systems (WMS) that provides data engineers a friendly platform to automate, monitor and maintain their complex data pipelines. Started at Airbnb in 2014, then became an open-source project with an excellent UI, Airflow has become a popular choice among developers.

- 5 main components of Airflow:
    - Scheduler: Starts DAGs based on triggers or schedules and moves them towards completion.
    - Work Queue: is used by the scheduler in most Airflow installations to deliver tasks that need to be run to the Workers.
    - Workers: Runs and records the outcome of individual pipeline tasks.
    - Database/Metadata database: saves credentials, connections, history, configuration, also stores the state of all tasks in the system.
    - UI/Web Server: Provides a control interface for users and maintainers.
- Airflow could be connected to external systems and databases through a reusable interface called Airflow Hooks. There are many available hooks for different system. The PostgresHook is used to connect Airflow to Amazon Redshift.

### 1.3. USE CASE OF AIRFLOW & AMAZON REDSHIFT

- A standard ETL data pipeline hosted on Amazon Redshift could be a workflow that extracts the data from S3, stages them in Redshift, and transforms data into a set of dimensional tables.
- This whole ETL pipeline could be implement as a DAG on Apache Airflow, so that the data pipeline can be automatically monitored and maintained.
- In this tutorial, I will walk you through the whole process of building a data pipeline hosted on Amazon Redshift using Apache Airflow.
- The process include 4 steps. Step 1 & 4 could be done through this jupyter notebook:
    + Step 1: Build and launch a Redshift cluster using IaC (Infrastructure-as-code).
    + Step 2: Set up credentials an connection to Redshift cluster for Apache Airflow
    + Step 3: Run an Airflow DAG which is a simple ETL data pipeline hosted on Redshift
    + Step 4: Clean up the AWS resources using IaC

<a id='process'></a>
## 2. PRACTICE - PROCESS OF BUILDING A DATA PIPELINE HOSTED ON AMAZON REDSHIFT USING APACHE AIRFLOW
<a id='launcher'></a>
### 2.1. STEP 1: BUILD AND LAUNCH A REDSHIFT CLUSTER

In [63]:
### Import neccessary package
import boto3
import json
import configparser

import pandas as pd
import matplotlib.pyplot as plt
from time import time
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


#### 2.1.1. GENERATE AN IAM USER WITH AWS SECRET AND ACCESS KEY

- Create a new IAM user in the AWS account
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab
- Take note of the access key and secret 
- Edit the file `dwh.cfg` in the same folder as this notebook and fill
<font color='red'>
<BR>
[AWS]<BR>
KEY= YOUR_AWS_KEY<BR>
SECRET= YOUR_AWS_SECRET<BR>
</font>

- This file contains all the parameters for creating the AWS Redshift cluster. We can modify the information if neccessary, such as cluster_type, user_id, password, etc.

##### Load DWH Params from the file `dwh.cfg`

In [64]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


#### 2.1.2. CREATE CLIENTS FOR IAM, EC2, S3 AND REDSHIFT

In [65]:
### Choosing an AWS region 
target_region = "us-west-2"

ec2 = boto3.resource('ec2',
                       region_name = target_region,
                       aws_access_key_id = KEY,
                       aws_secret_access_key = SECRET
                    )

s3 = boto3.resource('s3',
                       region_name = target_region,
                       aws_access_key_id = KEY,
                       aws_secret_access_key = SECRET
                   )

iam = boto3.client('iam',
                       region_name = target_region,
                       aws_access_key_id = KEY,
                         aws_secret_access_key = SECRET     
                  )

redshift = boto3.client('redshift',
                       region_name = target_region,
                       aws_access_key_id = KEY,
                       aws_secret_access_key = SECRET
                       )

#### 2.1.3. CREATE IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [66]:
from botocore.exceptions import ClientError

### Create the role
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::543309725137:role/dwhRole


#### 2.1.4. CREATE REDSHIFT CLUSTER
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [67]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType = DWH_CLUSTER_TYPE,
        NodeType = DWH_NODE_TYPE,
        NumberOfNodes = int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName = DWH_DB,
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
        MasterUsername = DWH_DB_USER,
        MasterUserPassword = DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

##### Describe the cluster to see its status
- Run the block below several times until the cluster status becomes `Available` (around 5-10 mins)

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data = x, columns = ["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

#### Take note of the cluster <font color='red'> endpoint and role ARN </font> 

In [None]:
### DO NOT RUN THIS unless the cluster status becomes "Available"
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

#### 2.1.5. OPEN AN INCOMING TCP PORT TO ACCESS THE CLUSTER ENDPOINT

In [None]:
try:
    vpc = ec2.Vpc(id = myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName = defaultSg.group_name,
        CidrIp = '0.0.0.0/0',
        IpProtocol = 'TCP',
        FromPort = int(DWH_PORT),
        ToPort = int(DWH_PORT)
    )
except Exception as e:
    print(e)

#### 2.1.6. VALIDATE THE CONNECTION TO THE CLUSTER

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)
print(conn_string)
%sql $conn_string

<a id='airflow_connection'></a>
### 2.2. SET UP CREDENTIALS & CONNECTION TO AWS REDSHIFT CLUSTER

#### 2.2.1. SET UP CREDENTIALS
On the Apache Airflow UI:

1. Open Admin -> Connections
2. Click "Create"
3. Set "Conn Id" to "aws_credentials", "Conn Type" to "Amazon Web Services"
4. Set "Login" to your aws_access_key_id and "Password" to your aws_secret_key
5. Click save

<img src="img/aws_credentials.png" width="80%"/>

#### 2.2.2.  SET UP CONNECTION TO REDSHIFT
On the Apache Airflow UI:

1. Open Admin -> Connections
2. Click "Create"
3. Fill in the information as below

In [76]:
print(f"Conn Id :: redshift\nConn Id :: Postgres\nHost ::{DWH_ENDPOINT}\nSchema ::{DWH_DB}\
        \nLogin :: {DWH_DB_USER}\nPassword ::{DWH_DB_PASSWORD}\nPort ::{DWH_PORT}")

Conn Id :: redshift
Conn Id :: Postgres
Host ::dwhcluster.cscn3pfaocgv.us-west-2.redshift.amazonaws.com
Schema ::dwh        
Login :: dwhuser
Password ::Passw0rd
Port ::5439


<img src="img/redshift.png" width="80%"/>

<a id='run-dag'></a>
### 2.3. CREATE AND RUN AN AIRLOW ETL DAG

- This example uses the biking data stored on an public s3 bucket `"s3://udacity-dend/data-pipelines/divvy/unpartitioned/divvy_trips_2018.csv"` from Udacity. 
- In this example, data from the file `divvy_trips_2018.csv` will be loaded to Redshift as a stanging table called `trips`, then the data from `trips` will be extracted to create a traffic analysis table called `station_traffic`, where `num_departures` and `num_arrivals` are counted based on the total number of `from_station_id` and `to_station_id`, respectively, from the `trips` table.

<img src="biking_erd.png" width="50%"/>

(ERD diagram was made by using https://dbdiagram.io/)


- Below is the code for this end-to-end data pipeline, which was implemeted as an Airflow DAG. This code could be used to create a script file for running on Airflow.

In [None]:
import datetime
import logging

from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

##### ------ Define python function -------
def load_data_to_redshift(*args, **kwargs):
    """
    Function to load data from s3 to redshift using the copy command
    """
    aws_hook = AwsHook("aws_credentials") #from Step 2.2.1. Set up credentials
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift") #from Step 2.2.2. Set up connection to redshift
    ## print info to make sure we get the right access_key and secret_key, since
    # it often gets wrong during copy/paste
    logging.info(sql_statements.COPY_ALL_TRIPS_SQL.format(credentials.access_key, credentials.secret_key))
    ## Run the copy command
    redshift_hook.run(sql_statements.COPY_ALL_TRIPS_SQL.format(credentials.access_key, credentials.secret_key))

##### ------ Define SQL statements -------

CREATE_TRIPS_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS trips (
    trip_id INTEGER NOT NULL,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP NOT NULL,
    bikeid INTEGER NOT NULL,
    tripduration DECIMAL(16,2) NOT NULL,
    from_station_id INTEGER NOT NULL,
    from_station_name VARCHAR(100) NOT NULL,
    to_station_id INTEGER NOT NULL,
    to_station_name VARCHAR(100) NOT NULL,
    usertype VARCHAR(20),
    gender VARCHAR(6),
    birthyear INTEGER,
    PRIMARY KEY(trip_id))
    DISTSTYLE ALL;
    """

COPY_SQL = """
    COPY {}
    FROM '{}'
    ACCESS_KEY_ID '{{}}'
    SECRET_ACCESS_KEY '{{}}'
    IGNOREHEADER 1
    DELIMITER ','
    """

COPY_ALL_TRIPS_SQL = COPY_SQL.format(
    "trips",
    "s3://udacity-dend/data-pipelines/divvy/unpartitioned/divvy_trips_2018.csv"
)

STATION_TRAFFIC_SQL = """
    DROP TABLE IF EXISTS station_traffic;
    CREATE TABLE station_traffic AS
    SELECT
        DISTINCT(t.from_station_id) AS station_id,
        t.from_station_name AS station_name,
        num_departures,
        num_arrivals
    FROM trips t
    JOIN (
        SELECT
            from_station_id,
            COUNT(from_station_id) AS num_departures
        FROM trips
        GROUP BY from_station_id
    ) AS s1 ON t.from_station_id = s1.from_station_id
    JOIN (
        SELECT
            to_station_id,
            COUNT(to_station_id) AS num_arrivals
        FROM trips
        GROUP BY to_station_id
    ) AS s2 ON t.from_station_id = s2.to_station_id
    """

##### ------ Define dag -------
dag = DAG(
    'data_pipeline_airflow_Redshift',
    start_date = datetime.datetime.now()
)

##### ------ Define tasks -------
### --- Task with PostgresOperator ---
# Create table
create_table = PostgresOperator(
    task_id = "create_table",
    dag = dag,
    postgres_conn_id = "redshift",
    sql = sql_statements.CREATE_TRIPS_TABLE_SQL
)
# Traffic analysis
location_traffic_task = PostgresOperator(
    task_id = "calculate_location_traffic",
    dag = dag,
    postgres_conn_id = "redshift",
    sql = sql_statements.STATION_TRAFFIC_SQL
)

### --- Task with PythonOperator ---
copy_task = PythonOperator(
    task_id = 'load_from_s3_to_redshift',
    dag = dag,
    python_callable = load_data_to_redshift
)


##### ------ Configure the task dependencies -------
# Task dependencies such that the graph looks like the following:
# create_table -> copy_task -> location_traffic_task

create_table >> copy_task
copy_task >> location_traffic_task

<a id='clean-up'></a>
### 2.4. CLEAN UP THE RESOURCES
DO NOT RUN THIS UNLESS YOU ARE SURE TO DELETE YOUR CLUSTER

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources, don't skip any final snapshot to avoid AWS fee
redshift.delete_cluster(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot = True)

**Run the block below several times until the cluster was really deleted. When the cluster was deleted, there would be no result at all:**

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName = DWH_IAM_ROLE_NAME, PolicyArn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName = DWH_IAM_ROLE_NAME)

**End of the process.**

<a id='conclusions'></a>
## 3. CONCLUSIONS




- This tutorial is a demo for creating a simple data warehouse ETL pipeline on AWS with Airflow. I hope it is helpful.
- Some of the materials are from the Data Engineering nanodegree program on Udacity.
- This is a part of my medium blog post, check out if you want to learn more about it.