#  Cloud Data Warehouse _ Music Streaming Business 

Note:
This ipynd file contain 4 parts

PART 1 _ Create IAM role and attach policy
    1.1 Parse 'dwh_1.cfg' file
    1.2 Create Resources & Clients
    1.3 Create IAM role
Part 2 _ Create Redshift Cluster
    2.1 Create & Validate Cluster
    2.2 Set Security Group and CIDR
Part 3 _ ETL
    3.1 Create Staging Tables & New Schema
    3.2 ETL (Load,Transform,Insert)
Part 4 _ HOUSE KEEPING

Recommendation:
If you have AMAZON AWS account, and prefer a Iac approach, you can follow below steps, with only your AccessKey (KEY and SECRET).
If you already manually created role, policy, set security group and redshift clusters. you can jump to PART 3 and start from there.


In [1]:
import pandas as pd
import boto3
import json
import configparser
from botocore.exceptions import ClientError
from time import time


# PART 1 _ Create IAM role and attach policy

## 1.1 Parse 'dwh_1.cfg' file

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

# admin accesskey
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

# configuration for creating redshift cluster
DWH_CLUSTER_TYPE = config.get('DWH','DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH','DWH_NUM_NODES')
DWH_NODE_TYPE = config.get('DWH','DWH_NODE_TYPE')

DWH_CLUSTER_IDENTIFIER = config.get('DWH','DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH','DWH_DB')
DWH_DB_USER = config.get('DWH','DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH','DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH','DWH_PORT')

# name of IAM role, which shall also have S3readonly policy
DWH_IAM_ROLE_NAME = config.get('DWH','DWH_IAM_ROLE_NAME')


In [3]:
# print .cfg content to check
pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd123
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


## 1.2 Create Resources & Clients

In [None]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET,
                    region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

print('---Resources and Clients Created.---')

## 1.3 Create IAM role

In [None]:
# from botocore.exceptions import ClientError

# Create IAM role
try:
    print("Creating a new IAM Role...") 
    dwhRole1 = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
               'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)

print('IAM role created.')

In [None]:
# attaching policy AmazonS3ReadOnlyAccess

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [None]:
print(roleArn)

# Part 2 _ Create Redshift Cluster

## 2.1 Create & Validate Cluster

In [None]:
try:
    response = redshift.create_cluster(        
        #cluster_config
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [None]:
# run below scripts to check status of cluster just created.

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

<span style="color:red">WARNING: Run below scripts ONLY after you check the status of the cluster is "AVAILABLE"</span>     

In [None]:
# DWH_ENDPOINT will be used to construct the CONN_String. to connect to the cluster
# DWH_ROLE_ARN  will be the credentials for Copy Command: copy data from S3 bucket

DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

COPY DWH_ENDPOINT & DWH_ROLE_ARN  to .cfg file

## 2.2 Set Security Group and CIDR

In [None]:
# Here use the default security group
# set CIDR to '0.0.0.0/0', not limit to a subnet

try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)
    
# VPC security group might already exist. That's fine to see error in this case.

# Part 3 _ ETL

Run "Create_table.py" 
This will create: 
a. Two staging table for copying source data from S3; 
b. New schema, 5 tables

## 3.1 Create Staging Tables & New Schema

In [None]:
# it is OK to uncomment and run .py file directly. 


In [None]:
%run -i create_tables.py

### CHECK: whether table created or not

In [None]:
%load_ext sql
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

In [None]:
%sql SELECT DISTINCT schemaname, tablename FROM "pg_table_def" WHERE schemaname='public';
# it should show 7 table names

In [None]:
%sql SELECT * FROM "pg_table_def" WHERE tablename='staging_events';
# check table

## 3.2 ETL (Load,Transform,Insert)

In [None]:
%run etl.py

### CHECK: how many data been inserted into each table.

In [None]:
%%sql 
SELECT CONCAT((SELECT count(*) FROM staging_events),' rows inserted into staging_events') AS "ETL job result"
UNION
SELECT CONCAT((SELECT count(*) FROM staging_songs),' rows inserted into staging_songs')
UNION
SELECT CONCAT((SELECT count(*) FROM artists),' rows inserted into artists')
UNION
SELECT CONCAT((SELECT count(*) FROM songs),' rows inserted into songs')
UNION
SELECT CONCAT((SELECT count(*) FROM users),' rows inserted into users')
UNION
SELECT CONCAT((SELECT count(*) FROM time),' rows inserted into time')
UNION
SELECT CONCAT((SELECT count(*) FROM songplays),' rows inserted into songplays')
; 

In [None]:
%sql SELECT * FROM songplays where song_id IS NOT NULL LIMIT 10;

## Part 4 _ HOUSE KEEPING

In [None]:
%sql vacuum;

In [None]:
%sql analyze;

In [None]:
%sql DROP TABLE IF EXISTS "staging_events";
%sql DROP TABLE IF EXISTS "staging_songs";
%sql DROP TABLE IF EXISTS "songplays";
%sql DROP TABLE IF EXISTS "users";
%sql DROP TABLE IF EXISTS "songs";
%sql DROP TABLE IF EXISTS "artists";
%sql DROP TABLE IF EXISTS "time";

In [None]:
# delete redshift cluster

redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
# check status of cluster
# deleting cluster might take several minutes, run multiple and make sure it's beed deleted

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
# delete the IAM Role created: detach role policy, then delete role

iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
