In [1]:
import configparser
import psycopg2
import pandas as pd
from time import time
import matplotlib.pyplot as plt
import boto3
import json

In [2]:
# CONFIG
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

DB_NAME                 = config.get("CLUSTER","DB_NAME")
DB_USER                 = config.get("CLUSTER","DB_USER")
DB_PASSWORD             = config.get("CLUSTER","DB_PASSWORD")
DB_PORT                 = config.get("CLUSTER","DB_PORT")
HOST                    =config.get("CLUSTER", "HOST")

pd.DataFrame({"Param":
                  ["DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"],
              "Value":
                  [DB_NAME, DB_USER, DB_PASSWORD, DB_PORT]
             })

Unnamed: 0,Param,Value
0,DB_NAME,dwh
1,DB_USER,dwhuser
2,DB_PASSWORD,Passw0rd
3,DB_PORT,5439


### Creating IAM User, EC2, S3 and Redshift

In [3]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2"
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2"
                   )

iam = boto3.client('iam', 
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2"
                       )

#### Getting a sample song data

In [None]:
# S3 path
# s3://udacity-dend/log_json_path.json

file = s3.Object("udacity-dend", 'log_json_path.json')
content = json.loads(file.get()['Body'].read())
print(json.dumps(content, indent=4, sort_keys=True))


#### Getting s3 objects using S3 Resource

In [None]:
obj_col = s3.Bucket("udacity-dend").objects.filter(Prefix = 'song_data/')

# print(list(obj_col)[0])
for i, obj in enumerate(obj_col):
    print(obj.key)
    if i > 10:
        break


Example of getting list of objects paths in S3 using Client (just another way of doing it)

In [None]:
s3_client = boto3.client('s3',
                       region_name="us-west-2"
                   )

lst_contents = s3_client.list_objects(Bucket="udacity-dend", Prefix='song_data')['Contents']

print("list length: {}".format(len(lst_contents)))

print(lst_contents[0])
print(lst_contents[1])

##### Extracting the content from 1 file in S3 bucket as a sample data

Extracting song_data sample

In [None]:
# files = s3.Bucket("udacity-dend").objects.filter(Prefix = 'song_data')

# getting a sample data from the original dataset
file = s3.Object("udacity-dend", 'song_data/A/A/A/TRAAAAV128F421A322.json')

# printing in a pretty JSON format
content = json.loads(file.get()['Body'].read())
print(json.dumps(content, indent=4))

Extracting song_data sample

In [None]:
obj_col = s3.Bucket("udacity-dend").objects.filter(Prefix = 'log_data/')

# print(list(obj_col)[0])
for i, obj in enumerate(obj_col):
    print(obj.key)
    if i > 10:
        break

In [None]:
# obj_log = s3.Object("udacity-dend", 'log_data/2018/11/2018-11-03-events.json')
obj_log = s3.Object("udacity-dend", 'log_data/2018/11/2018-11-06-events.json')
	# s3://udacity-dend/log_data/2018/11/2018-11-06-events.json


content_log = obj_log.get()['Body'].read().decode("utf-8")

df_log = pd.read_json(path_or_buf=content_log, orient='records', lines=True)
df_log.head(10)
# df_log[df_log.page == 'NextSong'].head(10)

In [None]:
# original
print("Original Extraction")
print("Obj type (Bytes): {0}".format(type(obj_log.get()['Body'].read())))
print("Obj type (Converted to string): {0}".format(type(obj_log.get()['Body'].read().decode("utf-8"))))

# print("Sample")
print("Sample string \n {0}".format(obj_log.get()['Body'].read().decode("utf-8")[:1000])) # print the first 1000 characters

print()
print("Converted Pandas Dataframe")
df_log.head(5)

### Connect to Redshift Cluster

In [None]:
%load_ext sql

In [None]:
import os 
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, HOST, DB_PORT, DB_NAME)
print(conn_string)
%sql $conn_string

### Create tables

In [None]:
%%sql
select * from tb_dim_users;

In [4]:
from sql_queries import copy_table_queries, insert_table_queries

def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        print(query)[:50]
        cur.execute(query)
        conn.commit()


def insert_tables(cur, conn):
    for query in insert_table_queries:
        print(query)[:50]
        cur.execute(query)
        conn.commit()

config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

load_staging_tables(cur, conn)
insert_tables(cur, conn)

conn.close()



INSERT INTO tb_fact_songplays 
(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT
    a.ts            as start_time,      
    a.userId        as user_id,         
    a.level         as level,           
    b.song_id       as song_id,         
    b.artist_id     as artist_id,       
    a.sessionId     as session_id,      
    a.location      as location,        
    a.userAgent     as user_agent     
FROM tb_staging_events as a
JOIN tb_staging_songs as b
    on upper(trim(a.artist)) = upper(trim(b.artist_name))
    and upper(trim(a.song)) = upper(trim(b.title))
WHERE trim(a.page) = 'NextSong'
;


INSERT INTO tb_dim_users
(user_id, first_name, last_name, gender, level)
SELECT
    userId as user_id,     
    firstName as first_name,
    lastName as last_name,
    gender as gender,
    level as level
FROM tb_staging_events
WHERE trim(page) = 'NextSong'
;


INSERT INTO tb_dim_songs
(song_id, title, artist_id, year, duration)
SELECT
    song_id as

In [None]:
%%sql
select 
count(1)
from tb_staging_events

In [None]:
%%sql
select 
count(1)
from tb_staging_songs

In [None]:
%%sql
SELECT
    count(1) as qtd
FROM tb_staging_events as a
LEFT JOIN tb_staging_songs as b
    on upper(trim(a.artist)) = upper(trim(b.artist_name))
    and upper(trim(a.song)) = upper(trim(b.title))
WHERE trim(a.page) = 'NextSong'