# Exercise 3: Parallel ETL

In [1]:
%load_ext sql

In [2]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time

# STEP 1: Get the params of the created redshift cluster 
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [4]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

In [17]:
# Retrieve cluster information
redshift = boto3.client('redshift',
                   region_name="us-west-2",
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                  )

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier",
                  "NodeType",
                  "ClusterStatus",
                  "MasterUsername",
                  "DBName",
                  "Endpoint",
                  "NumberOfNodes",
                  "VpcId",
                  "IamRoles"]
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterMetadata = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterMetadata)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-51c5ed29
7,NumberOfNodes,4
8,IamRoles,"[{'IamRoleArn': 'arn:aws:iam::728762156239:role/dwhRole', 'ApplyStatus': 'in-sync'}]"


In [16]:
# FILL IN THE REDSHIFT ENPOINT HERE
# e.g. DWH_ENDPOINT="redshift-cluster-1.csmamz5zxmle.us-west-2.redshift.amazonaws.com" 
DWH_ENDPOINT = myClusterMetadata["Endpoint"]["Address"]
print(DWH_ENDPOINT)
    
#FILL IN THE IAM ROLE ARN you got in step 2.2 of the previous exercise
#e.g DWH_ROLE_ARN="arn:aws:iam::988332130976:role/dwhRole"
DWH_ROLE_ARN = myClusterMetadata["IamRoles"][0]["IamRoleArn"]
print(DWH_ROLE_ARN)

dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com
arn:aws:iam::728762156239:role/dwhRole


# STEP 2: Connect to the Redshift Cluster

In [18]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [23]:
s3 = boto3.resource('s3',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                  )

# TODO: Create udacity-labs bucket
sampleDbBucket = s3.create_bucket(
    ACL='private',
    Bucket='udacity-labs123123',
)

In [26]:
# TODO: Iterate over bucket objects starting with "ssbgz" and print
for obj in s3.Bucket("udacity-labs").objects.filter(Prefix='tickets'):
    print('{0}'.format(obj.key))

tickets/
tickets/full/
tickets/full/full.csv.gz
tickets/split/
tickets/split/part-00000-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00001-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00002-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00003-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00004-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00005-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00006-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00007-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00008-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00009-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz


# STEP 3: Create Tables

In [25]:
%%sql 
DROP TABLE IF EXISTS "sporting_event_ticket";
CREATE TABLE "sporting_event_ticket" (
    "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

# STEP 4: Load Partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/split/part` using your iam role credentials. Use gzip delimiter `;`.

In [27]:
%%time
qry = """
    COPY sporting_event_ticket FROM 's3://udacity-labs/tickets/split/part'
    CREDENTIALS 'aws_iam_role={}'
    GZIP delimiter ';' 
    COMPUPDATE OFF
    REGION 'us-west-2';
""".format(DWH_ROLE_ARN)
# COMPUPDATE OFF (Compression update off) wouldnt normally be declared, only for this demo

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.36 ms, sys: 0 ns, total: 4.36 ms
Wall time: 12.6 s


# STEP 5: Create Tables for the non-partitioned data

In [28]:
%%sql
DROP TABLE IF EXISTS "sporting_event_ticket_full";
CREATE TABLE "sporting_event_ticket_full" (
    "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);

 * postgresql://dwhuser:***@dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

# STEP 6: Load non-partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/full/full.csv.gz` using your iam role credentials. Use gzip delimiter `;`.

- Note how it's slower than loading partitioned data

In [29]:
%%time

qry = """
    COPY sporting_event_ticket_full FROM 's3://udacity-labs/tickets/full/full.csv.gz'
    CREDENTIALS 'aws_iam_role={}'
    GZIP delimiter ';' 
    COMPUPDATE OFF
    REGION 'us-west-2';
""".format(DWH_ROLE_ARN)
# COMPUPDATE OFF (Compression update off) wouldnt normally be declared, only for this demo

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cyrq5aqioub8.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.08 ms, sys: 51 µs, total: 4.14 ms
Wall time: 28.4 s
