In [1]:
import pandas as pd
import boto3
import json

## Load DWH params from file

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

## Create clients for IAM, EC2, S3 and Redshift

In [None]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

## Check Bucket Contents

In [None]:
logDataBucket = s3.Bucket('udacity-dend')
print(logDataBucket)
for obj in logDataBucket.objects.filter(Prefix="log_data"):
    print(obj)

In [None]:
songDataBucket = s3.Bucket('udacity-dend')
print(songDataBucket)
for obj in songDataBucket.objects.filter(Prefix="song_data"):
    print(obj)

In [None]:
songDataBucket = s3.Bucket('udacity-dend')
print(songDataBucket)
for obj in songDataBucket.objects.filter(Prefix="log_json"):
    print(obj)

## IAM Role

In [None]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

## Create Redshift Cluster

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

## Note the Endpoint andf Role ARN

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

In [None]:
#LOG_DATA      = config.get("S3", "BUCKET")
LOG_PATH    = config.get("S3", "LOG_DATA")
SONG_PATH    = config.get("S3", "SONG_DATA")
LOG_JSONPATH = config.get("S3", "LOG_JSONPATH")
print(LOG_PATH)
print(SONG_PATH)
print(LOG_JSONPATH)

## Open an incoming  TCP port to access the cluster ednpoint

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

In [None]:
%load_ext sql

## Connect to the Redshift Cluster

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

## Testing Staging Create Scripts

In [None]:
%%sql

DROP TABLE IF EXISTS songs_staging;
DROP TABLE IF EXISTS events_staging;

CREATE TABLE IF NOT EXISTS songs_staging (
artist_id varchar, 
artist_latitude numeric, 
artist_location varchar, 
artist_longitude numeric,
artist_name varchar, 
duration float, 
num_songs int, 
song_id varchar, 
title varchar, 
year int);

CREATE TABLE IF NOT EXISTS events_staging (
artist varchar, 
auth varchar, 
firstName varchar, 
gender character, 
itemInSession integer, 
lastName varchar,
length numeric, 
level varchar, 
location varchar, 
method varchar, 
page varchar, 
registration numeric,
sessionId integer, 
song varchar, 
status varchar, 
ts bigint, 
userAgent varchar, 
userId integer);

## Testing Staging Copy Scripts

In [None]:
%%sql

COPY songs_staging FROM 's3://udacity-dend/song_data'
    credentials 'aws_iam_role=arn:aws:iam::828209340962:role/dwhRole'
    format as json 'auto'
    region 'us-west-2';

In [None]:
%%sql

COPY events_staging FROM 's3://udacity-dend/log_data'
    credentials 'aws_iam_role=arn:aws:iam::828209340962:role/dwhRole'
    format as json 's3://udacity-dend/log_json_path.json'
    region 'us-west-2';

In [None]:
%%sql

select count(*) from staging_events

In [None]:
%%sql

select count(*) from staging_songs

## Testing data load in FACT and Dimension tables 

In [None]:
%%sql

SELECT  DISTINCT se.userId AS user_id,
            se.firstName AS first_name,
            se.lastName AS last_name,
            se.gender AS gender,
            se.level AS level
    FROM staging_events AS se
    WHERE se.page = 'NextSong'
limit 10;

In [None]:
%%sql

SELECT  DISTINCT ss.song_id AS song_id,
            ss.title AS title,
            ss.artist_id AS artist_id,
            ss.year AS year,
            ss.duration AS duration
    FROM staging_songs AS ss
limit 10;

In [None]:
%%sql

SELECT  DISTINCT ss.artist_id AS artist_id,
            ss.artist_name AS name,
            ss.artist_location AS location,
            ss.artist_latitude AS latitude,
            ss.artist_longitude AS longitude
    FROM staging_songs AS ss
limit 10;

In [None]:
%%sql

SELECT  DISTINCT TIMESTAMP 'epoch' + se.ts/1000 * INTERVAL '1 second' AS start_time,
            EXTRACT(hour FROM start_time)AS hour,
            EXTRACT(day FROM start_time) AS day,
            EXTRACT(week FROM start_time) AS week,
            EXTRACT(month FROM start_time) AS month,
            EXTRACT(year FROM start_time) AS year,
            EXTRACT(week FROM start_time) AS weekday
    FROM    staging_events AS se
    WHERE se.page = 'NextSong'
limit 10;

In [None]:
%%sql
SELECT  DISTINCT TIMESTAMP 'epoch' + se.ts/1000 * INTERVAL '1 second' AS start_time,
            se.userId AS user_id,
            se.level AS level,
            ss.song_id AS song_id,
            ss.artist_id AS artist_id,
            se.sessionId AS session_id,
            se.location AS location,
            se.userAgent AS user_agent
    FROM staging_events AS se
    JOIN staging_songs AS ss
        ON (se.artist = ss.artist_name)
    WHERE se.page = 'NextSong'
limit 10;

## Debug dataload errors in Redshift staging tables


In [None]:
%%sql

select d.query, substring(d.filename,14,20), 
d.line_number as line, 
substring(d.value,1,16) as value,
substring(le.err_reason,1,48) as err_reason
from stl_loaderror_detail d, stl_load_errors le
where d.query = le.query
and d.query = pg_last_copy_id(); 

In [None]:
%%sql

select * from stl_load_errors

## Clean the Redshift Cluster Resources

In [None]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)