In [None]:
import pandas as pd
import boto3
import json
from botocore.exceptions import ClientError

In [None]:
KEY = ''
SECRET = ''

In [None]:
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

In [None]:
ROLE_NAME = 'project_3_role_name'

In [None]:
#1.1 Create the role, 
try:
    Project_3_Role = iam.create_role(
        Path='/',
        RoleName = ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    

In [None]:
POLICY_ARN = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'

In [None]:
iam.attach_role_policy(RoleName=ROLE_NAME,
                       PolicyArn=POLICY_ARN
                      )['ResponseMetadata']['HTTPStatusCode']

In [None]:
redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [None]:
CLUSTER_TYPE='multi-node'
NODE_TYPE='dc2.large'
NUM_NODES='4'
CLUSTER_IDENTIFIER='Project-3-Cluster'
DB='project_3_db'
DB_USER='project_3_user'
DB_PASSWORD='Passw0rd'
ROLE_ARN = iam.get_role(RoleName=ROLE_NAME)['Role']['Arn']

In [None]:
ROLE_ARN

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=CLUSTER_TYPE,
        NodeType=NODE_TYPE,
        NumberOfNodes=int(NUM_NODES),
        ClusterIdentifier=CLUSTER_IDENTIFIER,

        #Identifiers & Credentials
        DBName=DB,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[ROLE_ARN]  
    )
except Exception as e:
    print(e)

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
ENDPOINT = myClusterProps['Endpoint']['Address']
PORT=myClusterProps['Endpoint']['Port']

In [None]:
ENDPOINT

In [None]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(PORT),
        ToPort=int(PORT)
    )
except Exception as e:
    print(e)

In [None]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="song_data"):
    print(obj)

In [None]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="log_data"):
    print(obj)
    

In [None]:
s3 = boto3.client('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

result = s3.get_object(Bucket="udacity-dend", Key="log_data/2018/11/2018-11-13-events.json") 
text = result["Body"].read().decode()

In [None]:
print(text[:900]) # Use your desired JSON Key for your value 

In [None]:
%load_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, ENDPOINT, PORT, DB)
%sql $conn_string

In [None]:
%%sql 
select query, filename,line_number as line, 
substring(colname,0,12) as column, type, position as pos, raw_line,
raw_field_value, 
err_reason
from stl_load_errors 
order by query desc, filename 
limit 1;

In [None]:
%%sql 
DROP TABLE IF EXISTS song_data;
CREATE TABLE song_data (
    artist_id VARCHAR(20) NOT NULL,
    artist_latitude DOUBLE PRECISION,
    artist_location VARCHAR(200),
    artist_longitude DOUBLE PRECISION,
    artist_name VARCHAR(200) NOT NULL,
    duration DOUBLE PRECISION NOT NULL,
    num_songs INTEGER NOT NULL,
    song_id VARCHAR(20) NOT NULL,
    title VARCHAR(200) NOT NULL,
    year INTEGER NOT NULL
);

In [None]:
qry = """
    copy song_data from 's3://udacity-dend/song_data'
    credentials 'aws_iam_role={}'
    format as json 'auto';
""".format(ROLE_ARN)

%sql $qry

In [None]:
%%sql
select count(*) from song_data

In [None]:
%%sql 
DROP TABLE IF EXISTS log_data;
CREATE TABLE log_data (
    artist VARCHAR(200),
    auth VARCHAR(200) NOT NULL,
    firstname VARCHAR(200),
    gender CHAR(1),
    iteminsession INTEGER,
    lastname VARCHAR(200),
    length double precision,
    level VARCHAR(20) NOT NULL,
    location VARCHAR(200),
    method VARCHAR(20) NOT NULL,
    page VARCHAR(20) NOT NULL,
    registration double precision,
    sessionid INTEGER,
    song VARCHAR(200),
    status INTEGER NOT NULL,
    ts BIGINT NOT NULL,
    useragent VARCHAR(200),
    userid INTEGER
);

In [None]:
qry = """
    copy log_data from 's3://udacity-dend/log_data'
    credentials 'aws_iam_role={}'
    json 's3://olayemiodefunsho/jsonpath.json';
""".format(ROLE_ARN)

%sql $qry

In [None]:
%%sql
select top 1 * FROM log_data WHERE page = 'NextSong'

In [None]:
%%sql 
DROP TABLE IF EXISTS users;
CREATE TABLE users 
(
  u_user_id     INTEGER NOT NULL SORTKEY,
  u_first_name  VARCHAR(200) NOT NULL,
  u_last_name   VARCHAR(200) NOT NULL,
  u_gender      CHAR(1) NOT NULL,
  u_level       VARCHAR(20) NOT NULL
)
DISTSTYLE ALL;

INSERT INTO users (u_user_id,u_first_name,u_last_name,u_gender,u_level)
WITH t1 AS
    (SELECT userid,firstname,lastname,gender,level,
    ROW_NUMBER() OVER(PARTITION BY userid ORDER BY dateadd(second, ts, '1970-01-01 00:00:00') desc) AS row_number
    FROM log_data where userid is not null)
SELECT userid,firstname,lastname,gender,level
FROM t1
WHERE t1.row_number = 1

In [None]:
%%sql
select count(*) from users

In [None]:
%%sql 
DROP TABLE IF EXISTS songs;
CREATE TABLE songs 
(
  s_song_id     VARCHAR(200) NOT NULL SORTKEY,
  s_title       VARCHAR(200) NOT NULL,
  s_artist_id   VARCHAR(200) NOT NULL,
  s_year        INTEGER NOT NULL,
  s_duration    DOUBLE PRECISION NOT NULL
)
DISTSTYLE ALL;

INSERT INTO songs (s_song_id, s_title, s_artist_id, s_year, s_duration)
SELECT song_id, title, artist_id, year, duration
FROM song_data

In [None]:
%%sql
select count(*) from songs

In [None]:
%%sql 
DROP TABLE IF EXISTS artists;
CREATE TABLE artists 
(
  a_artist_id  VARCHAR(200) NOT NULL SORTKEY,
  a_name       VARCHAR(200) NOT NULL,
  a_location   VARCHAR(200),
  a_lattitude  DOUBLE PRECISION,
  a_longitude  DOUBLE PRECISION
)
DISTSTYLE ALL;

INSERT INTO artists (a_artist_id,a_name,a_location,a_lattitude,a_longitude)
WITH t1 AS (
    SELECT artist_id, artist_name, artist_location, artist_latitude, artist_longitude, 
    ROW_NUMBER() OVER(PARTITION BY artist_id ORDER BY len(artist_name)) AS row_number
    FROM song_data)
SELECT t1.artist_id, t1.artist_name, t1.artist_location, t1.artist_latitude, t1.artist_longitude
FROM t1
WHERE t1.row_number = 1

In [None]:
%%sql
select count(*) from artists

In [None]:
%%sql 
DROP TABLE IF EXISTS time;
CREATE TABLE time 
(
    t_start_time DATETIME NOT NULL SORTKEY, 
    t_hour INTEGER NOT NULL, 
    t_day INTEGER NOT NULL, 
    t_week INTEGER NOT NULL, 
    t_month INTEGER NOT NULL, 
    t_year INTEGER NOT NULL , 
    t_weekday INTEGER NOT NULL
)
DISTSTYLE ALL;

INSERT INTO time (t_start_time,t_hour,t_day,t_week,t_month ,t_year ,t_weekday)
SELECT 
DATEADD(second, ts, '1970-01-01 00:00:00'), 
DATEPART(HOUR, DATEADD(SECOND, ts, '1970-01-01 00:00:00')),  
DATEPART(DAY, DATEADD(SECOND, ts, '1970-01-01 00:00:00')),  
DATEPART(WEEK, DATEADD(SECOND, ts, '1970-01-01 00:00:00')),  
DATEPART(MONTH, DATEADD(SECOND, ts, '1970-01-01 00:00:00')),  
DATEPART(YEAR, DATEADD(SECOND, ts, '1970-01-01 00:00:00')),
DATEPART(WEEKDAY, DATEADD(SECOND, ts, '1970-01-01 00:00:00'))  
FROM log_data
GROUP BY ts
ORDER BY ts

In [None]:
%%sql
select count(*) from time

In [None]:
%%sql
select slice, col, num_values as rows, minvalue, maxvalue
from svv_diskusage
where name='songplays' and col=0 and rows>0
order by slice, col;

In [None]:
%%sql 
DROP TABLE IF EXISTS songplays;
CREATE TABLE songplays 
(
    sp_songplay_id INTEGER NOT NULL SORTKEY, 
    sp_start_time DATETIME NOT NULL, 
    sp_user_id INTEGER NOT NULL, 
    sp_level VARCHAR(20) NOT NULL, 
    sp_song_id VARCHAR(20) NOT NULL, 
    sp_artist_id VARCHAR(20) NOT NULL, 
    sp_session_id INTEGER NOT NULL, 
    sp_location VARCHAR(200) NOT NULL, 
    sp_user_agent VARCHAR(200)
)
DISTSTYLE EVEN;

INSERT INTO songplays (sp_songplay_id,sp_start_time,sp_user_id,sp_level,sp_song_id,sp_artist_id,sp_session_id,sp_location,sp_user_agent)
SELECT CAST(CAST(sessionid AS VARCHAR) + CAST(iteminsession AS VARCHAR) AS INTEGER),
DATEADD(SECOND, ts, '1970-01-01 00:00:00'), 
userid,
(SELECT TOP 1 u_level FROM users WHERE u_user_id = userid),
(SELECT TOP 1 s_song_id FROM songs WHERE s_title = song),
(SELECT TOP 1 s_artist_id FROM songs WHERE s_title = song),
sessionid,location,useragent
FROM log_data WHERE page = 'NextSong' AND song IN (SELECT s_title FROM songs)

In [None]:
%%sql
select count(*) from songplays

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=ROLE_NAME, PolicyArn=POLICY_ARN)
iam.delete_role(RoleName=ROLE_NAME)
#### CAREFUL!!