# Exercise 3: Parallel ETL

In [16]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [17]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time

# STEP 1: Get the params of the created redshift cluster 
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [23]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','KEY')
SECRET= config.get('AWS','SECRET')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

In [24]:
# FILL IN THE REDSHIFT ENPOINT HERE
# e.g. DWH_ENDPOINT="redshift-cluster-1.csmamz5zxmle.us-west-2.redshift.amazonaws.com" 
DWH_ENDPOINT="dwhcluster.ciy0cawj5cot.us-west-2.redshift.amazonaws.com" 
    
#FILL IN THE IAM ROLE ARN you got in step 2.2 of the previous exercise
#e.g DWH_ROLE_ARN="arn:aws:iam::988332130976:role/dwhRole"
DWH_ROLE_ARN="arn:aws:iam::455221240075:role/dwhRole"

# STEP 2: Connect to the Redshift Cluster

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

In [6]:
import redshift_connector

# Establish connection using redshift_connector
conn = redshift_connector.connect(
    host=DWH_ENDPOINT,
    database=DWH_DB,  # Your database name
    port=DWH_PORT,       # Redshift default port
    user=DWH_DB_USER,  # Your Redshift username
    password=DWH_DB_PASSWORD  # Your Redshift password
)

In [25]:
# Create S3 client
session = boto3.Session(region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET)
s3= session.client("s3")

# Get objects of udacity-labs bucket (with client)
sampleDbBucket =  s3.list_objects_v2(Bucket='udacity-labs',
                                     Prefix='tickets')

In [None]:
# this can not be used if a s3 client is used!
# if resource: object-orientated
for obj in sampleDbBucket.objects.filter(Prefix="tickets"):
    print(obj)

In [27]:
# for direct AWS API call through client
for obj in sampleDbBucket.get('Contents', []):
    print(obj["Key"])

tickets/
tickets/full/
tickets/full/full.csv.gz
tickets/split/
tickets/split/part-00000-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00001-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00002-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00003-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00004-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00005-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00006-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00007-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00008-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz
tickets/split/part-00009-d33afb94-b8af-407d-abd5-59c0ee8f5ee8-c000.csv.gz


# STEP 3: Create Tables

In [22]:
import redshift_connector

conn = redshift_connector.connect(
    host=DWH_ENDPOINT,
    database=DWH_DB,  # Your database name
    port=DWH_PORT,       # Redshift default port
    user=DWH_DB_USER,  # Your Redshift username
    password=DWH_DB_PASSWORD  # Your Redshift password
)

try:
    with conn.cursor() as cursor:
        cursor.execute("""DROP TABLE IF EXISTS "sporting_event_ticket";""")
        cursor.execute("""
            CREATE TABLE "sporting_event_ticket" (
                "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
                "sporting_event_id" double precision NOT NULL,
                "sport_location_id" double precision NOT NULL,
                "seat_level" numeric(1,0) NOT NULL,
                "seat_section" character varying(15) NOT NULL,
                "seat_row" character varying(10) NOT NULL,
                "seat" character varying(10) NOT NULL,
                "ticketholder_id" double precision,
                "ticket_price" numeric(8,2) NOT NULL
            );
        """)
        conn.commit()
        print("Table created successfully.")
except Exception as e:
    print(f"Error: {e}")
    conn.rollback()  # Rollback to clean the transaction state
finally:
    conn.close()


Table created successfully.


In [None]:
%%sql 
DROP TABLE IF EXISTS "sporting_event_ticket";
CREATE TABLE "sporting_event_ticket" (
    "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);

# STEP 4: Load Partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/split/part` using your iam role credentials. Use gzip delimiter `;`.

In [None]:
%%time
# not so good """.{}...""".format(DWH_ROLE_ARN) better:
qry = f"""
    COPY sporting_event_ticket 
    FROM 's3://udacity-labs/tickets/split/part'
    CREDENTIALS 'aws_iam_role={DWH_ROLE_ARN}'
    GZIP
    DELIMITER ';'
    REGION 'us-west-2';
"""

%sql $qry

In [29]:
%%time
import redshift_connector

conn = redshift_connector.connect(
    host=DWH_ENDPOINT,
    database=DWH_DB,  # Your database name
    port=DWH_PORT,       # Redshift default port
    user=DWH_DB_USER,  # Your Redshift username
    password=DWH_DB_PASSWORD  # Your Redshift password
)

try:
    with conn.cursor() as cursor:
        cursor.execute(f"""
            COPY sporting_event_ticket 
            FROM 's3://udacity-labs/tickets/split/part'
            CREDENTIALS 'aws_iam_role={DWH_ROLE_ARN}'
            GZIP
            DELIMITER ';'
            REGION 'us-west-2';
        """)
        conn.commit()
        print("successfully.")
except Exception as e:
    print(f"Error: {e}")
    conn.rollback()  # Rollback to clean the transaction state
finally:
    conn.close()


successfully.
CPU times: total: 62.5 ms
Wall time: 28.8 s


# STEP 5: Create Tables for the non-partitioned data

In [None]:
%%sql
DROP TABLE IF EXISTS "sporting_event_ticket_full";
CREATE TABLE "sporting_event_ticket_full" (
    "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
    "sporting_event_id" double precision NOT NULL,
    "sport_location_id" double precision NOT NULL,
    "seat_level" numeric(1,0) NOT NULL,
    "seat_section" character varying(15) NOT NULL,
    "seat_row" character varying(10) NOT NULL,
    "seat" character varying(10) NOT NULL,
    "ticketholder_id" double precision,
    "ticket_price" numeric(8,2) NOT NULL
);

In [30]:
import redshift_connector

conn = redshift_connector.connect(
    host=DWH_ENDPOINT,
    database=DWH_DB,  # Your database name
    port=DWH_PORT,       # Redshift default port
    user=DWH_DB_USER,  # Your Redshift username
    password=DWH_DB_PASSWORD  # Your Redshift password
)

try:
    with conn.cursor() as cursor:
        cursor.execute("""DROP TABLE IF EXISTS "sporting_event_ticket_full";""")
        cursor.execute("""
            CREATE TABLE "sporting_event_ticket_full" (
                "id" double precision DEFAULT nextval('sporting_event_ticket_seq') NOT NULL,
                "sporting_event_id" double precision NOT NULL,
                "sport_location_id" double precision NOT NULL,
                "seat_level" numeric(1,0) NOT NULL,
                "seat_section" character varying(15) NOT NULL,
                "seat_row" character varying(10) NOT NULL,
                "seat" character varying(10) NOT NULL,
                "ticketholder_id" double precision,
                "ticket_price" numeric(8,2) NOT NULL
            );
        """)
        conn.commit()
        print("Table created successfully.")
except Exception as e:
    print(f"Error: {e}")
    conn.rollback()  # Rollback to clean the transaction state
finally:
    conn.close()


Table created successfully.


# STEP 6: Load non-partitioned data into the cluster
Use the COPY command to load data from `s3://udacity-labs/tickets/full/full.csv.gz` using your iam role credentials. Use gzip delimiter `;`.

- Note how it's slower than loading partitioned data

In [None]:
%%time

qry = f"""   
    COPY sporting_event_ticket_full 
    FROM 's3://udacity-labs/tickets/full/full.csv.gz'
    CREDENTIALS 'aws_iam_role={DWH_ROLE_ARN}'
    GZIP
    DELIMITER ';'
    REGION 'us-west-2';  
"""

%sql $qry

In [31]:
%%time
import redshift_connector

conn = redshift_connector.connect(
    host=DWH_ENDPOINT,
    database=DWH_DB,  # Your database name
    port=DWH_PORT,       # Redshift default port
    user=DWH_DB_USER,  # Your Redshift username
    password=DWH_DB_PASSWORD  # Your Redshift password
)

try:
    with conn.cursor() as cursor:
        cursor.execute(f"""   
            COPY sporting_event_ticket_full 
            FROM 's3://udacity-labs/tickets/full/full.csv.gz'
            CREDENTIALS 'aws_iam_role={DWH_ROLE_ARN}'
            GZIP
            DELIMITER ';'
            REGION 'us-west-2';  
        """)
        conn.commit()
        print("successfully.")
except Exception as e:
    print(f"Error: {e}")
    conn.rollback()  # Rollback to clean the transaction state
finally:
    conn.close()


successfully.
CPU times: total: 46.9 ms
Wall time: 36.1 s
