Create Redshift Cluster if not created already

Note: You will need to update your AWS KEY & Secret Key in dwh.cfg file

In [3]:
import configparser
import json
import boto3
import matplotlib.pyplot as plt
import pandas as pd
from time import time
from botocore.exceptions import ClientError

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = "multi-node"
DWH_NUM_NODES          = 4
DWH_NODE_TYPE          = "dc2.large"

DWH_CLUSTER_IDENTIFIER = "dwhCluster"
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DB_PORT")

DWH_IAM_ROLE_NAME      = "dwhRole"

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-east-1'
                  )

redshift = boto3.client('redshift',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

roleArn = iam.get_role(RoleName='dwhRole')['Role']['Arn']

#(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

#pd.DataFrame({"Param":
#                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
#              "Value":
#                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
#             })
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


#STEP 1

Run create_tables.py to create database schema and connect to AWS resources

In [1]:
run create_tables.py


-------DROP The Tables------

stage_events is dropped.
stage_songs is dropped.
artist is dropped.
songs is dropped.
users is dropped.
songplays is dropped.
time is dropped.

-------CREATE The Tables------

stage_events is created.
stage_songs is created.
artist is created.
songs is created.
users is created.
songplays is created.
time is created.


#STEP 2

Run the etl.py to complete the ETL process

(It may take upto 30 Minutes) 

In [2]:
run etl.py


-------COPY The Files Data to Staging Tables------

stage_events : data is copied.
stage_songs : data is copied.

-------INSERT The Data into Analytics Tables------

artist : data is inserted.
songs : data is inserted.
users : data is inserted.
songplays : data is inserted.
time : data is inserted.


-----Investigate Errors (in case)------

In [3]:
#%load_ext sql
import psycopg2
import os
import sqlalchemy
from sqlalchemy import *

config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

print(conn)

<connection object at 0x7fe55e42d9c8; dsn: 'user=dwhuser password=xxx dbname=dwh host=dwhcluster.ci91joqghsgo.us-east-1.redshift.amazonaws.com port=5439', closed: 0>


In [4]:
#%%sql
try:
#   cur.execute("select le.query, le.err_reason \
   cur.execute("select * from STL_LOAD_ERRORS limit 3;")
except psycopg2.Error as e:
    print("Error: select *")
    print(e)
    
row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()

(100, 5, 101717, datetime.datetime(2020, 9, 15, 3, 3, 43, 643790), 10197, 44168, 's3://udacity-dend/log_data/2018/11/2018-11-06-events.json                                                                                                                                                                                                       ', 1, '                                                                                                                               ', '          ', '          ', 0, '1541470364796                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                

#STEP 3

Check the results from newly created tables

In [9]:
#%%sql
try:
#   cur.execute("select * from stage_events limit 5;")
#   cur.execute("select * from stage_songs limit 5;")
#    cur.execute("select * from songplays limit 5;")
#   cur.execute("select * from users limit 5;")
#   cur.execute("select * from songs limit 5;")
#   cur.execute("select * from artists limit 5;")               
   cur.execute("select * from time limit 5;")                              

except psycopg2.Error as e:
    print("Error: select *")
    print(e)
    
row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()

(datetime.datetime(2018, 11, 28, 13, 45, 0, 796000), 13, 28, 48, 11, 2018, 3)
(datetime.datetime(2018, 11, 14, 15, 47, 47, 796000), 15, 14, 46, 11, 2018, 3)
(datetime.datetime(2018, 11, 8, 10, 28, 27, 796000), 10, 8, 45, 11, 2018, 4)
(datetime.datetime(2018, 11, 16, 10, 43, 30, 796000), 10, 16, 46, 11, 2018, 5)
(datetime.datetime(2018, 11, 3, 19, 33, 39, 796000), 19, 3, 44, 11, 2018, 6)


Check the counts

In [11]:
stage_events_count      = "select count(*) from stage_events"
stage_songs_count       = "select count(*) from stage_songs"
songplays_count         = "select count(*) from songplays"
users_count             = "select count(*) from users"
songs_count             = "select count(*) from songs"
artists_count           = "select count(*) from artists"
time_count              = "select count(*) from time"

table_list = ["stage_events", "stage_songs", "songplays", "users", "songs", "artist", "time"] 
count_table_queries = [stage_events_count, stage_songs_count, songplays_count, users_count, songs_count, artists_count, time_count]

i = 0
for query in count_table_queries:
    print("{} count: ".format(table_list[i]))
    cur.execute(query)
    row = cur.fetchone()
    print(row)
    i = i + 1

stage_events count: 
(8056,)
stage_songs count: 
(14896,)
songplays count: 
(333,)
users count: 
(104,)
songs count: 
(14896,)
artist count: 
(10025,)
time count: 
(333,)
