# Sparkify Control Point

In [1]:
# First things first, we have to create a redshift cluster for our project on AWS
# Here, we'd be using IaC to proceed with the processes

# importing boto3, AWS python SDK
import boto3
from botocore.exceptions import ClientError

import configparser
import json
import pandas as pd

## Configuration

In [2]:
# Extracting config variables 
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config.get('USER', 'KEY')
SECRET = config.get('USER', 'SECRET')

In [3]:
DWH_ROLE_NAME = config.get('DWH', 'DWH_ROLE_NAME')
DWH_DB_NAME = config.get('DWH', 'DWH_DB_NAME')
DWH_CLUSTER_ID = config.get('DWH', 'DWH_CLUSTER_ID')
DWH_NODE_TYPE = config.get('DWH', 'DWH_NODE_TYPE')
DWH_USER_NAME = config.get('DWH', 'DWH_USER_NAME')
DWH_USER_PASSWORD = config.get('DWH', 'DWH_USER_PASSWORD')
DWH_NUMBER_0F_NODES = int(config.get('DWH', 'DWH_NUMBER_0F_NODES'))
DWH_PORT = int(config.get('DWH', 'DWH_PORT'))

variables = pd.DataFrame({
    'keys':['DWH_ROLE_NAME', 'DWH_DB_NAME', 'DWH_CLUSTER_ID', 'DWH_NODE_TYPE', 'DWH_NUMBER_0F_NODES', 'DWH_PORT'], 
    'values':[DWH_ROLE_NAME, DWH_DB_NAME, DWH_CLUSTER_ID, DWH_NODE_TYPE, DWH_NUMBER_0F_NODES, DWH_PORT]
})

variables

Unnamed: 0,keys,values
0,DWH_ROLE_NAME,redshift_s3_readonly
1,DWH_DB_NAME,sparkifydb
2,DWH_CLUSTER_ID,sparkify-cluster
3,DWH_NODE_TYPE,dc2.large
4,DWH_NUMBER_0F_NODES,4
5,DWH_PORT,5439


### Create IAM role for Redshift cluster
This role will grant redshift AmazonS3ReadOnlyAccess

In [4]:
# Instantiating IAM client
iam = boto3.client('iam', region_name='us-east-2', aws_access_key_id=KEY, aws_secret_access_key=SECRET)

try:
    print('Creating IAM role for Redshift cluster...')
    iam_role = iam.create_role(
        RoleName=DWH_ROLE_NAME,
        AssumeRolePolicyDocument=json.dumps({
            'Statement': [{
                'Action': 'sts:AssumeRole',
                'Effect': 'Allow',
                'Principal': {'Service': 'redshift.amazonaws.com'}
            }],
            'Version': '2012-10-17'
        }),
        Description='Allows Redshift cluster to call AWS services on you behalf',
    )
    print('Role creation successful!')
    
    
    iam.attach_role_policy(
        RoleName=DWH_ROLE_NAME,
        PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
    )['ResponseMetadata']['HTTPStatusCode']
    
    print('Role policy attached successfully!')
except Exception as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        iam_role = iam.get_role(RoleName=DWH_ROLE_NAME)
        print('Role gotten')
    else:
        print(e)

Creating IAM role for Redshift cluster...
Role gotten


In [7]:
role_arn = iam_role['Role']['Arn']
role_arn

### Build the Redshift cluster

In [6]:
# Instantiating redshift client
redshift = boto3.client('redshift', region_name='us-east-2', aws_access_key_id=KEY, aws_secret_access_key=SECRET)

try:
    print('Creating Redshift cluster...')
    redshift_cluster = redshift.create_cluster(
        DBName=DWH_DB_NAME,
        ClusterIdentifier=DWH_CLUSTER_ID,
        NodeType=DWH_NODE_TYPE,
        MasterUsername=DWH_USER_NAME,
        MasterUserPassword=DWH_USER_PASSWORD,
        NumberOfNodes=DWH_NUMBER_0F_NODES,
        IamRoles=[
            role_arn,
        ]
    )
    print('Redshift cluster creation successful!')
except Exception as e:
    print(e)

Creating Redshift cluster...
Redshift cluster creation successful!


In [8]:
# Checking cluster availability status
cluster_props = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_ID)['Clusters'][0]
cluster_props['ClusterAvailabilityStatus'], cluster_props['ClusterStatus']

In [9]:
# Obtaining cluster endpoint
DWH_ENDPOINT = cluster_props['Endpoint']['Address']
DWH_PORT = int(cluster_props['Endpoint']['Port'])
print('Endpoint: {}\nPort: {}'.format(DWH_ENDPOINT, DWH_PORT))

In [10]:
# cluster_vars = pd.DataFrame({
#     'keys':['ClusterIdentifier', 'NodeType', 'ClusterStatus', 'Endpoint:Address', 'Endpoint:Port', 'IamRole', 'Vpc', 'NumberOfNodes'], 
#     'values':[cluster_props['ClusterIdentifier'], cluster_props['NodeType'], cluster_props['ClusterStatus'], 
#               cluster_props['Endpoint']['Address'], cluster_props['Endpoint']['Port'], cluster_props['IamRoles'][0]['IamRoleArn'],
#               cluster_props['VpcId'], cluster_props['NumberOfNodes']]
# })

# cluster_vars

### Open Incomming TCP port to access the cluster endpoint

In [12]:
# get an ec2 resourse
ec2 = boto3.resource('ec2', region_name='us-east-2', aws_access_key_id=KEY, aws_secret_access_key=SECRET)

In [12]:
try: 
    vpc = ec2.Vpc(id=cluster_props['VpcId'])
    default_sg = list(vpc.security_groups.all())[0]
    print(default_sg)
    
    default_sg.authorize_ingress(
        GroupName=default_sg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=DWH_PORT,
        ToPort=DWH_PORT
    )
except Exception as e:
    print(e)

## ETL

In [13]:
%load_ext sql

In [11]:
conn_string = 'postgresql://{}:{}@{}:{}/{}'.format(DWH_USER_NAME, DWH_USER_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB_NAME)
%sql $conn_string

In [71]:
%%sql
DROP TABLE IF EXISTS "staging_events";
DROP TABLE IF EXISTS "staging_songs";

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.
Done.


[]

In [18]:
%%sql
DROP TABLE IF EXISTS "staging_events";
CREATE TABLE IF NOT EXISTS "staging_events" (
    "artist" VARCHAR,
    "auth" VARCHAR,
    "firstName" VARCHAR,
    "gender" VARCHAR,
    "itemInSession" SMALLINT,
    "lastName" VARCHAR,
    "length" REAL,
    "level" VARCHAR,
    "location" VARCHAR,
    "method" VARCHAR,
    "page" VARCHAR,
    "registration" DOUBLE PRECISION,
    "sessionId" SMALLINT,
    "song" VARCHAR,
    "status" SMALLINT,
    "ts" BIGINT,
    "userAgent" VARCHAR,
    "userId" SMALLINT
);

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.
Done.


[]

In [19]:
%%sql
COPY "staging_events" FROM 's3://udacity-dend/log-data'
CREDENTIALS 'aws_iam_role=arn:aws:iam::451737047229:role/redshift_s3_readonly'
COMPUPDATE OFF REGION 'us-west-2'
JSON 's3://udacity-dend/log_json_path.json';

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.


[]

In [15]:
%%sql
DROP TABLE IF EXISTS "staging_songs";
CREATE TABLE IF NOT EXISTS "staging_songs" (
    "artist_id" VARCHAR,
    "artist_latitude" DECIMAL(18,12),
    "artist_location" VARCHAR(MAX),
    "artist_longitude" DECIMAL(18,12),
    "artist_name" VARCHAR(MAX),
    "duration" DOUBLE PRECISION, 
    "num_songs" SMALLINT,
    "song_id" VARCHAR,
    "title" VARCHAR(MAX),
    "year" SMALLINT
);

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.
Done.


[]

In [76]:
%%sql
COPY "staging_songs" FROM 's3://udacity-dend/song-data'
CREDENTIALS 'aws_iam_role=arn:aws:iam::451737047229:role/redshift_s3_readonly'
COMPUPDATE OFF REGION 'us-west-2'
JSON 'auto ignorecase';

In [45]:
%%sql
SELECT * 
FROM staging_songs_2
LIMIT 10;

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
0 rows affected.


artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year


In [52]:
%%sql
DROP TABLE IF EXISTS "staging_songs_2";
CREATE TABLE IF NOT EXISTS "staging_songs_2" (
    "artist_id" VARCHAR,
    "artist_latitude" DECIMAL(18,12),
    "artist_location" VARCHAR(MAX),
    "artist_longitude" DECIMAL(18,12),
    "artist_name" VARCHAR(MAX),
    "duration" DOUBLE PRECISION, 
    "num_songs" SMALLINT,
    "song_id" VARCHAR,
    "title" VARCHAR(MAX),
    "year" SMALLINT
);

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.
Done.


[]

In [54]:
%%sql
COPY "staging_songs_2" FROM 's3://udacity-dend/song_data'
CREDENTIALS 'aws_iam_role=arn:aws:iam::451737047229:role/redshift_s3_readonly'
COMPUPDATE OFF REGION 'us-west-2'
JSON 'auto ignorecase'

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.


[]

In [186]:
%%sql
WITH main_table AS (
    SELECT *
    FROM staging_events
    WHERE page = 'NextSong'
), timestamp_table AS (
    SELECT ts, timestamp 'epoch' + (ts/1000) * interval '1 second' AS t_stamp
    FROM main_table
)

SELECT t_stamp, date_part('weekday', t_stamp) AS weekday
FROM timestamp_table
ORDER BY 1 DESC
LIMIT 10;

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
10 rows affected.


t_stamp,weekday
2018-11-30 19:54:24,5.0
2018-11-30 18:51:24,5.0
2018-11-30 18:47:58,5.0
2018-11-30 18:44:36,5.0
2018-11-30 18:40:05,5.0
2018-11-30 18:39:22,5.0
2018-11-30 18:36:25,5.0
2018-11-30 18:35:19,5.0
2018-11-30 18:32:46,5.0
2018-11-30 18:31:34,5.0


In [272]:
%%sql
DROP TABLE IF EXISTS "time";
CREATE TABLE IF NOT EXISTS "time" (
    start_time BIGINT NOT NULL SORTKEY,
    hour SMALLINT NOT NULL,
    day SMALLINT NOT NULL,
    week SMALLINT NOT NULL,
    month SMALLINT NOT NULL,
    year SMALLINT NOT NULL,
    weekday SMALLINT NOT NULL
)
DISTSTYLE ALL;

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
Done.
Done.


[]

In [274]:
%%sql
INSERT INTO "time" ("start_time", "hour", "day", "week", "month", "year", "weekday")
SELECT DISTINCT ts AS start_time,
        DATE_PART('hour', TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second') AS hour,
        DATE_PART('day', TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second') AS day,
        DATE_PART('week', TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second') AS week,
        DATE_PART('month', TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second') AS month,
        DATE_PART('year', TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second') AS year,
        DATE_PART('weekday', TIMESTAMP 'epoch' + (ts/1000) * INTERVAL '1 second') AS weekday
FROM staging_events;

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
8023 rows affected.


[]

In [276]:
%%sql
SELECT *
FROM time
ORDER BY start_time DESC
LIMIT 10;

 * postgresql://mike:***@sparkify-cluster.csc0efk6rxxc.us-east-2.redshift.amazonaws.com:5439/sparkifydb
10 rows affected.


start_time,hour,day,week,month,year,weekday
1543607664796,19,30,48,11,2018,5
1543603993796,18,30,48,11,2018,5
1543603884796,18,30,48,11,2018,5
1543603678796,18,30,48,11,2018,5
1543603476796,18,30,48,11,2018,5
1543603205796,18,30,48,11,2018,5
1543603162796,18,30,48,11,2018,5
1543602985796,18,30,48,11,2018,5
1543602936796,18,30,48,11,2018,5
1543602919796,18,30,48,11,2018,5


In [38]:
s3 = boto3.resource('s3', aws_access_key_id=KEY, aws_secret_access_key=SECRET)
u_bucker = s3.Bucket('udacity-dend')

In [39]:
count = 0
total_size = 0
for obj in u_bucker.objects.filter(Prefix='song-data'):
    count += 1
    total_size += obj.size

In [40]:
count

385253

In [41]:
total_size

103473867

## Cleaning up

In [278]:
# response = redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_ID, SkipFinalClusterSnapshot=True)
# response

In [None]:
# iam.detach_role_policy(RoleName=DWH_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
# iam.delete_role(RoleName=DWH_ROLE_NAME)