# Creating Redshift Cluster using the AWS python SDK 

In [1]:
from time import time
import matplotlib.pyplot as plt
import pandas as pd
import boto3
import json
import psycopg2
from create_tables import *
from etl import *


# STEP 0: Create AWS secret and access key

- Create a new IAM user in your AWS account
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab
- Take note of the access key and secret 
- Edit the file `dwh.cfg` in the same folder as this notebook and fill
<font color='red'>
<BR>
[AWS]<BR>
KEY= YOUR_AWS_KEY<BR>
SECRET= YOUR_AWS_SECRET<BR>
<font/>

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,songdb
5,DWH_DB_USER,student
6,DWH_DB_PASSWORD,Password123
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [3]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

# STEP 1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [4]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::995978081066:role/dwhRole


# STEP 2:  Redshift Cluster

- Create a RedShift Cluster
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [5]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

## 2.1 *Describe* the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [8]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,student
4,DBName,songdb
5,Endpoint,"{'Address': 'dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0594b37d
7,NumberOfNodes,4


<h2> 2.2 Take note of the cluster <font color='red'> endpoint and role ARN </font> </h2>


In [9]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::995978081066:role/dwhRole


## STEP 3: Open an incoming  TCP port to access the cluster ednpoint

In [10]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-2723e11c')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


# STEP 4: cluster

In [12]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [13]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string


postgresql://student:Password123@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb


'Connected: student@songdb'

# STEP 6: Make sure you can connect to the S3

In [14]:
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="song_data"):
    print(obj)
    break
    
for obj in sampleDbBucket.objects.filter(Prefix="log_data"):
    print(obj)
    break

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')


# STEP 7: Create Tables for partitioned and non-partitioned data
- We are going to use a benchmarking data set common for benchmarking star schemas in data warehouses.
- The data is pre-loaded in a public bucket on the `us-west-2` region



## 7.1 Create staging table

In [15]:
%%sql 

DROP TABLE IF EXISTS staging_events;
DROP TABLE IF EXISTS staging_songs;


CREATE TABLE staging_events (
        artist              	    VARCHAR,
        auth                	    VARCHAR,
        first_name          	    VARCHAR,
        gender              	    CHAR,
        session_item        	    INT,
        last_name           	    VARCHAR,
        length              	    FLOAT,
        level               	    VARCHAR,
        location            	    VARCHAR,
        method              	    VARCHAR,
        page                	    VARCHAR,
        registration        	    BIGINT,
        session_id          	    INT,
        song                	    VARCHAR ,
        status              	    INT,
        ts                  	    BIGINT,
        user_agent          	    VARCHAR,
        user_id             	    INT
    );

CREATE TABLE staging_songs (
        artist_id           	    VARCHAR,
        artist_location     	    VARCHAR,
        artist_latitude     	    FLOAT,
        artist_longitude    	    FLOAT,
        artist_name         	    VARCHAR,
        duration            	    FLOAT,
        num_songs           	    INT,
        song_id             	    VARCHAR,
        title               	    VARCHAR,
        year                	    INT
    );


 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
Done.
Done.
Done.
Done.


[]

## 7.2 Copying tables

In [16]:
def loadTables(table):
    loadTimes = []
    
    SQL_EVENT_COPY = """
    COPY staging_events
    FROM {}
    IAM_ROLE {}
    JSON {};""".format(config.get('S3', 'LOG_DATA'), config.get('SECURITY', 'ROLE_ARN'), config.get('S3', 'LOG_JSONPATH'))
    
    SQL_SONG_COPY = """
    COPY staging_songs
    FROM {}
    IAM_ROLE {}
    FORMAT AS JSON 'auto';""".format(config.get('S3', 'SONG_DATA'), config.get('SECURITY', 'ROLE_ARN'))
    
    if table == 'staging_events' :
        
        
        print("======= LOADING TABLE: ** {} **  =======".format('TABLE staging_events'))
        print(SQL_EVENT_COPY)

        t0 = time()
        %sql $SQL_EVENT_COPY
        loadTime = time()-t0
        loadTimes.append(loadTime)
        print("=== DONE IN: {0:.2f} sec\n".format(loadTime))
    
        return pd.DataFrame({"table":"staging_events", "loadtime":loadTimes})
    
    else :
        
        print("======= LOADING TABLE: ** {} ** =======".format('TABLE staging_songs'))
        print(SQL_SONG_COPY)

        t0 = time()
        %sql $SQL_SONG_COPY
        loadTime = time()-t0
        loadTimes.append(loadTime)
        print("=== DONE IN: {0:.2f} sec\n".format(loadTime))

        return pd.DataFrame({"table":"staging_songs", "loadtime":loadTimes})

In [17]:

eventStats = loadTables("staging_events")
songStats = loadTables("staging_songs")
#-- Plotting of the timing results
#stats = eventStats.join(songStats)
#stats.plot.bar()
#plt.show()


    COPY staging_events
    FROM 's3://udacity-dend/log_data'
    IAM_ROLE 'arn:aws:iam::995978081066:role/dwhRole'
    JSON 's3://udacity-dend/log_json_path.json';
 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
Done.
=== DONE IN: 2.50 sec


    COPY staging_songs
    FROM 's3://udacity-dend/song_data'
    IAM_ROLE 'arn:aws:iam::995978081066:role/dwhRole'
    FORMAT AS JSON 'auto';
 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
Done.
=== DONE IN: 230.79 sec



## 7.3 Creating tables

In [18]:
%%sql 

DROP TABLE IF EXISTS songplays;
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;
DROP TABLE IF EXISTS time;

CREATE TABLE songplays (
        songplay_id         	    INT IDENTITY(0, 1)      	   NOT NULL     	  PRIMARY KEY,
        start_time          	    TIMESTAMP NOT NULL,
        user_id             	    VARCHAR                 	   NOT NULL,
        level               	    VARCHAR NOT NULL,
        song_id             	    VARCHAR NOT NULL,
        artist_id           	    VARCHAR NOT NULL,
        session_id          	    INT,
        location            	    VARCHAR,
        user_agent          	    VARCHAR
    );

CREATE TABLE users (
        user_id             	    VARCHAR                 	   NOT NULL     	  PRIMARY KEY,
        first_name          	    VARCHAR                 	   NOT NULL,
        last_name           	    VARCHAR                 	   NOT NULL,
        gender              	    CHAR,
        level               	    VARCHAR                 	   NOT NULL
    );

CREATE TABLE songs (
        song_id             	    VARCHAR                 	   NOT NULL     	  PRIMARY KEY,
        title               	    VARCHAR                 	   NOT NULL,
        artist_id           	    VARCHAR                 	   NOT NULL,
        year                	    INT,
        duration            	    INT
    );

CREATE TABLE artists (
        artist_id           	   VARCHAR                  	   NOT NULL     	  PRIMARY KEY,
        name                	   VARCHAR                  	   NOT NULL,
        location            	   VARCHAR,
        latitude            	   FLOAT,
        longitude           	   FLOAT
    );

CREATE TABLE time (
        start_time          	  TIMESTAMP                 	   NOT NULL     	  PRIMARY KEY,
        hour                	  INT                       	   NOT NULL,
        day                 	  INT                       	   NOT NULL,
        week                	  INT                       	   NOT NULL,
        month               	  INT                       	   NOT NULL,
        year                	  INT                       	   NOT NULL,
        weekday             	  INT                       	   NOT NULL
    );


 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

## 7.3 Copy from Staging to Final tables

In [20]:
%%sql

INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    SELECT TIMESTAMP 'epoch' + (ts / 1000) * INTERVAL '1 Second ',
           user_id,
           level,
           song_id,
           artist_id,
           session_id,
           location,
           user_agent
    FROM staging_events e
    JOIN staging_songs s ON (e.song = s.title AND e.artist = s.artist_name AND e.length = s.duration);


INSERT INTO users (user_id, first_name, last_name, gender, level)
    SELECT DISTINCT (user_id),
           first_name,
           last_name,
           gender,
           level
    FROM staging_events
    WHERE user_id IS NOT NULL AND page = 'NextSong';



INSERT INTO songs (song_id, title, artist_id, year, duration)
    SELECT DISTINCT(song_id),
           title,
           artist_id,
           year,
           duration 
    FROM staging_songs;



INSERT INTO artists (artist_id, name, location, latitude, longitude)
    SELECT DISTINCT (artist_id),
           artist_name,
           artist_location,
           artist_latitude,
           artist_longitude
    FROM staging_songs
    WHERE artist_id IS NOT NULL;



INSERT INTO time (start_time, hour, day, week, month, year, weekday)
    SELECT DISTINCT (TIMESTAMP 'epoch' + (ts / 1000) * INTERVAL '1 Second ') as ts_timestamp,
           EXTRACT(HOUR FROM ts_timestamp),
           EXTRACT(DAY FROM ts_timestamp),
           EXTRACT(WEEK FROM ts_timestamp),
           EXTRACT(MONTH FROM ts_timestamp),
           EXTRACT(YEAR FROM ts_timestamp),
           EXTRACT(DOW FROM ts_timestamp)
    FROM staging_events;




 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
319 rows affected.
104 rows affected.
14896 rows affected.
10025 rows affected.
8023 rows affected.


[]

# STEP 8: Analysing the tables

In [21]:
%sql SELECT COUNT(*) FROM songplays;

 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
1 rows affected.


count
319


In [22]:
%sql SELECT * FROM songplays LIMIT 3;

 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
3 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
52,2018-11-26 08:33:59,44,paid,SOVWWJW12A670206BE,AR3ZL6A1187B995B37,781,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0
116,2018-11-28 07:15:21,58,paid,SONQEYS12AF72AABC9,ARLY7P81187B9ACF4D,887,"Augusta-Richmond County, GA-SC","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""
180,2018-11-09 19:35:24,36,paid,SOBJUKG12A58A7DCA8,AR9W3X91187FB3994C,392,"Janesville-Beloit, WI","""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""


In [23]:
%%sql
SELECT users.first_name, users.last_name, COUNT(songs.title) as songs_listened
FROM songplays
JOIN songs ON (songplays.song_id = songs.song_id)
JOIN artists ON (songplays.artist_id = artists.artist_id)
JOIN users ON (songplays.user_id = users.user_id)
GROUP BY users.first_name, users.last_name
ORDER BY songs_listened DESC;

 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
55 rows affected.


first_name,last_name,songs_listened
Chloe,Cuevas,50
Tegan,Levine,36
Kate,Harrell,28
Aleena,Kirby,23
Jacob,Klein,23
Mohammad,Rodriguez,18
Lily,Koch,15
Jacqueline,Lynch,15
Layla,Griffin,15
Matthew,Jones,13


In [24]:
%%sql
SELECT CONCAT(users.first_name, users.last_name) as user, songs.title as song, artists.name as artist, time.day, time.month, time.year
FROM songplays
JOIN songs ON (songplays.song_id = songs.song_id)
JOIN artists ON (songplays.artist_id = artists.artist_id)
JOIN users ON (songplays.user_id = users.user_id)
JOIN time ON (songplays.start_time = time.start_time)
LIMIT 10;

 * postgresql://student:***@dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com:5439/songdb
10 rows affected.


user,song,artist,day,month,year
TeganLevine,Up Up & Away,Kid Cudi,14,11,2018
TeganLevine,Up Up & Away,Kid Cudi / Kanye West / Common,14,11,2018
RylanGeorge,At Least It's Raining,Brand New Disaster,29,11,2018
SaraJohnson,Driver's Seat,Sniff 'n' The Tears,13,11,2018
JacobKlein,Sometime Around Midnight,The Airborne Toxic Event,12,11,2018
KevinArellano,Why,Basshunter,6,11,2018
KevinArellano,Why,Basshunter feat. DJ Mental Theos Bazzheadz,6,11,2018
LaylaGriffin,Timebomb (LP Version),Old 97's,19,11,2018
JacobKlein,You're The One,Dwight Yoakam,28,11,2018
KateHarrell,You're The One,Dwight Yoakam,13,11,2018


# STEP 5: Clean up your resources


In [25]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'student',
  'DBName': 'songdb',
  'Endpoint': {'Address': 'dwhcluster.cbwkpbxnim6m.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2021, 3, 18, 18, 21, 17, 275000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-2723e11c',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0594b37d',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'tue:11:30-tue:12:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': False,
  

- run this block several times until the cluster really deleted

In [27]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [28]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': 'c3c8f824-c3f7-40b0-8472-b8778495a8ce',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c3c8f824-c3f7-40b0-8472-b8778495a8ce',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Thu, 18 Mar 2021 18:40:29 GMT'},
  'RetryAttempts': 0}}