# Udacity Sparkify - Data Warehouse on AWS

## Step 1: Import and check configuration

### 1.1 Import packages

In [1]:
import boto3
import json
import configparser
import pandas as pd
from pathlib import Path
import time
import psycopg2


### 1.2. Load configuration parameters

In [52]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("CLUSTER","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("CLUSTER","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("CLUSTER","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("CLUSTER","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("CLUSTER","DWH_DB")
DWH_DB_USER            = config.get("CLUSTER","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DWH_DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("IAM_ROLE", "IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhsparkify
4,DWH_DB,songdb
5,DWH_DB_USER,awsuser
6,DWH_DB_PASSWORD,Aws123456
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,anhdtvRole


## Step 2: Create clients for IAM, EC2, S3 and Redshift

In [53]:
try:
    ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
    print("EC2 has been created")
except Exception as e:
    print(e)
try:
    s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )
    print("S3 has been created")
except Exception as e:
    print(e)    
try:
    iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name="us-west-2"
                  )
    print("IAM has been created")
except Exception as e:
    print(e)    
    
    
try:
    redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )
    print("Redshift has been created")
except Exception as e:
    print(e)    

EC2 has been created
S3 has been created
IAM has been created
Redshift has been created


### Create An IAM Role

In [54]:
try:
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{
                'Action': 'sts:AssumeRole',
                'Effect': 'Allow',
                'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
    print(("role {} has been created").format(DWH_IAM_ROLE_NAME))
except Exception as e:
    print(e)

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                   PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess"
                  )['ResponseMetadata']['HTTPStatusCode']
    
print(("role {} has been attached policy").format(DWH_IAM_ROLE_NAME))

role anhdtvRole has been created
role anhdtvRole has been attached policy


In [55]:
# Get ARN
DWH_ROLE_ARN = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print ('roleArn is ' + DWH_ROLE_ARN)

roleArn is arn:aws:iam::004668495805:role/anhdtvRole


### Create Redshift Cluster

In [61]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[DWH_ROLE_ARN]  
    )
    print(("Cluster {} has been created").format(DWH_CLUSTER_IDENTIFIER))
except Exception as e:
    print(e)
   

Cluster dwhsparkify has been created


In [66]:
# Check Status
def prettyRedshiftProps(props):
    keysToShow = ["ClusterIdentifier",
                  "NodeType", 
                  "ClusterStatus",
                  "MasterUsername", 
                  "DBName",
                  "Endpoint", 
                  "NumberOfNodes", 
                  'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

Cluster = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(Cluster)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhsparkify
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,songdb
5,Endpoint,{'Address': 'dwhsparkify.cccscnsbg5ik.us-west-...
6,VpcId,vpc-0f260bb33946d0513
7,NumberOfNodes,4


## Step 3: Check configuration

In [67]:
DWH_CLUSTER_STATUS = Cluster['ClusterStatus']
numOfTries = 0
while (DWH_CLUSTER_STATUS != 'available'):
    numOfTries = numOfTries + 1
    print ('{} tries to get Endpoint'.format(numOfTries))
    time.sleep(15)
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    DWH_CLUSTER_STATUS = Cluster['ClusterStatus']

DWH_ENDPOINT = Cluster['Endpoint']['Address']
DWH_ROLE_ARN = Cluster['IamRoles'][0]['IamRoleArn']
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
print('Endpoint is {}'.format(DWH_ENDPOINT))

Endpoint is dwhsparkify.cccscnsbg5ik.us-west-2.redshift.amazonaws.com


In [68]:
def open_tcp_port(ec2, cluster, DWH_PORT):
    try:
        vpc = ec2.Vpc(id = cluster['VpcId'])
        defaultSg = list(vpc.security_groups.all())[0]
        print(defaultSg)
        defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT)
        )
    except Exception as e:
        print(e)
    return defaultSg.id

IAM_SECURITY_GROUP = open_tcp_port(ec2, myClusterProps, DWH_PORT)
IAM_SECURITY_GROUP

ec2.SecurityGroup(id='sg-0dc25ca764eee4684')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


'sg-0dc25ca764eee4684'

In [69]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)
connect = psycopg2.connect(conn_string)
print(connect.status)
# %sql $conn_string
connect.close()

1


In [70]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

AWS_ACCESS_KEY          = config.get('AWS','KEY')
AWS_SECRET_ACCESS_KEY   = config.get('AWS','SECRET')

DWH_REGION              = config.get("CLUSTER","DWH_REGION")
DWH_CLUSTER_TYPE        = config.get("CLUSTER","DWH_CLUSTER_TYPE")
DWH_NUM_NODES           = config.get("CLUSTER","DWH_NUM_NODES")
DWH_NODE_TYPE           = config.get("CLUSTER","DWH_NODE_TYPE")
DWH_HOST                = config.get("CLUSTER","DWH_HOST")

DWH_CLUSTER_IDENTIFIER  = config.get("CLUSTER","DWH_CLUSTER_IDENTIFIER")
DWH_DB                  = config.get("CLUSTER","DWH_DB")
DWH_DB_USER             = config.get("CLUSTER","DWH_DB_USER")
DWH_DB_PASSWORD         = config.get("CLUSTER","DWH_DB_PASSWORD")
DWH_PORT                = config.get("CLUSTER","DWH_PORT")

IAM_ROLE_NAME           = config.get("IAM_ROLE", "IAM_ROLE_NAME")
IAM_ROLE_ARN            = config.get("IAM_ROLE", "IAM_ROLE_ARN")
IAM_SECURITY_GROUP      = config.get("IAM_ROLE", "IAM_SECURITY_GROUP")

pd.DataFrame({"Param":
                  ["DWH_REGION",
                   "DWH_CLUSTER_TYPE", 
                   "DWH_NUM_NODES", 
                   "DWH_NODE_TYPE", 
                   "DWH_CLUSTER_IDENTIFIER", 
                   "DWH_HOST", 
                   "DWH_DB", 
                   "DWH_DB_USER", 
                   "DWH_DB_PASSWORD", 
                   "DWH_PORT", 
                   "IAM_ROLE_NAME",
                   "IAM_ROLE_ARN",
                   "IAM_SECURITY_GROUP"],
              "Value":
                  [DWH_REGION, 
                   DWH_CLUSTER_TYPE, 
                   DWH_NUM_NODES, 
                   DWH_NODE_TYPE, 
                   DWH_CLUSTER_IDENTIFIER, 
                   DWH_HOST, 
                   DWH_DB, 
                   DWH_DB_USER, 
                   DWH_DB_PASSWORD, 
                   DWH_PORT, 
                   IAM_ROLE_NAME,
                   IAM_ROLE_ARN,
                   IAM_SECURITY_GROUP]
             })

Unnamed: 0,Param,Value
0,DWH_REGION,us-west-2
1,DWH_CLUSTER_TYPE,multi-node
2,DWH_NUM_NODES,4
3,DWH_NODE_TYPE,dc2.large
4,DWH_CLUSTER_IDENTIFIER,dwhsparkify
5,DWH_HOST,dwhsparkify.cccscnsbg5ik.us-west-2.redshift.am...
6,DWH_DB,songdb
7,DWH_DB_USER,awsuser
8,DWH_DB_PASSWORD,Aws123456
9,DWH_PORT,5439


## Step 4: Create Tables

In [79]:
!python create_tables.py

## Step 5: Run ETL

In [80]:
!python etl.py


    copy staging_events from 's3://udacity-dend/log_data'
    credentials 'aws_iam_role=arn:aws:iam::004668495805:role/anhdtvRole'
    json 's3://udacity-dend/log_json_path.json'
    region 'us-west-2';


    copy staging_songs from 's3://udacity-dend/song_data'
    credentials 'aws_iam_role=arn:aws:iam::004668495805:role/anhdtvRole'
    json 'auto'
    region 'us-west-2';



## Step 6: Some queries to analyst data

### 6.1. Import package for testing

In [81]:
import matplotlib.pyplot as plt
import numpy as np
import psycopg2

In [82]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [112]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(
    DWH_HOST,
    DWH_DB,
    DWH_DB_USER,
    DWH_DB_PASSWORD,
    DWH_PORT
))

cur = conn.cursor()


### 6.2. Top 10 most active users

In [113]:
query = """
    SELECT u.user_id, u.first_name, u.last_name, COUNT(sp.songplay_id) total_songplay
    FROM users u
    JOIN songplays sp ON u.user_id = sp.user_id
    GROUP BY u.user_id, u.first_name, u.last_name
    ORDER BY total_songplay DESC, user_id
    LIMIT 10
"""
cur.execute(query)
pd.DataFrame(cur.fetchall(), columns=("user_id", "first_name", "last_name", "total_songplay"))

Unnamed: 0,user_id,first_name,last_name,total_songplay
0,49,Chloe,Cuevas,689
1,80,Tegan,Levine,665
2,97,Kate,Harrell,557
3,15,Lily,Koch,463
4,44,Aleena,Kirby,397
5,29,Jacqueline,Lynch,346
6,24,Layla,Griffin,321
7,73,Jacob,Klein,289
8,88,Mohammad,Rodriguez,270
9,36,Matthew,Jones,248


### 6.3. Top artist with the most played song

In [96]:
query = """
    SELECT a.artist_id, a.name artist_name, COUNT(sp.songplay_id) total_songplays
    FROM songplays sp
    JOIN artists a ON sp.artist_id = a.artist_id
    GROUP BY a.artist_id, a.name
    ORDER BY total_songplays DESC
    LIMIT 10;
"""
cur.execute(query)
pd.DataFrame(cur.fetchall(), columns=("artist_id", "artist_name", "total_songplay"))

Unnamed: 0,artist_id,artist_name,total_songplay
0,AR5E44Z1187B9A1D74,Dwight Yoakam,37
1,ARD46C811C8A414F3F,Kid Cudi,10
2,ARD46C811C8A414F3F,Kid Cudi / Kanye West / Common,10
3,AR37SX11187FB3E164,Ron Carter,9
4,AR5EYTL1187B98EDA0,Lonnie Gordon,9
5,ARKQQZA12086C116FC,B.o.B,8
6,ARPDVPJ1187B9ADBE9,Usher featuring Jermaine Dupri,6
7,ARR3ONV1187B9A2F59,Muse,6
8,ARPDVPJ1187B9ADBE9,Usher,6
9,ARM0P6Z1187FB4D466,Arctic Monkeys,5


### 6.4. Top song with the highest number of played song

In [103]:
query = """
    SELECT s.song_id, s.title, a.name artist_name, COUNT(sp.songplay_id) total_songplay
    FROM songplays sp
    JOIN songs s  ON sp.song_id = s.song_id
    JOIN artists a ON sp.artist_id = a.artist_id
    GROUP BY s.song_id, s.title, a.name
    ORDER BY total_songplay DESC
    LIMIT 10;
"""
cur.execute(query)
pd.DataFrame(cur.fetchall(), columns=("song_id", "title", "artist_name", "total_songplays"))

Unnamed: 0,song_id,title,artist_name,total_songplays
0,SOBONKR12A58A7A7E0,You're The One,Dwight Yoakam,37
1,SOHTKMO12AB01843B0,Catch You Baby (Steve Pitron & Max Sanna Radio...,Lonnie Gordon,9
2,SOUNZHU12A8AE47481,I CAN'T GET STARTED,Ron Carter,9
3,SOULTKQ12AB018A183,Nothin' On You [feat. Bruno Mars] (Album Version),B.o.B,8
4,SOLZOBD12AB0185720,Hey Daddy (Daddy's Home),Usher,6
5,SOLZOBD12AB0185720,Hey Daddy (Daddy's Home),Usher featuring Jermaine Dupri,6
6,SOARUPP12AB01842E0,Up Up & Away,Kid Cudi / Kanye West / Common,5
7,SOTNHIP12AB0183131,Make Her Say,Kid Cudi,5
8,SOARUPP12AB01842E0,Up Up & Away,Kid Cudi,5
9,SOTNHIP12AB0183131,Make Her Say,Kid Cudi / Kanye West / Common,5


In [117]:
conn.close()

## Step 7: Delete resources

In [None]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)