## Import


In [None]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time

config = configparser.ConfigParser()
config.read_file(open("dwh.cfg"))

HOST = config.get("CLUSTER", "HOST")
DWH_DB = config.get("CLUSTER", "DB_NAME")
DWH_DB_USER = config.get("CLUSTER", "DB_USER")
DWH_DB_PASSWORD = config.get("CLUSTER", "DB_PASSWORD")
DWH_PORT = config.get("CLUSTER", "DB_PORT")

config_credential = configparser.ConfigParser()
config_credential.read_file(open(".credential.cfg"))
KEY = config_credential.get("AWS", "KEY")
SECRET = config_credential.get("AWS", "SECRET")


In [None]:
%load_ext sql

In [None]:
import os 
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, HOST, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

---
#### Inspect S3 dataset : log_data


In [None]:
import json

s3 = boto3.resource("s3", region_name="us-east-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
sampleDbBucket = s3.Bucket("udacity-dend")

cnt = 0
for obj in sampleDbBucket.objects.filter(Prefix="log_data"):
    cnt += 1
    print(obj)
    if cnt >= 5:
        break


#### Inspect S3 dataset : song_data


In [None]:
cnt = 0
for obj in sampleDbBucket.objects.filter(Prefix="song_data"):
    cnt += 1
    print(obj)
    if cnt >= 5:
        break


---

## Create Tables

![star schema](star-schema.png "star schema")


In [None]:
def run_create_tables():
    import create_tables

    create_tables.main()
    del create_tables


run_create_tables()


---

## Inspect created tables


### Show list of tables


In [None]:
%sql SELECT DISTINCT tablename FROM PG_TABLE_DEF WHERE schemaname = 'public';

---

## ETL


### COPY Staging_events, Staging_songs


In [None]:
def run_etl():
    import etl
    etl.main()
    del etl

run_etl()


### [Optional] `COPY` Straging_songs in smaller amount
In case Redshift have a few nodes (for my setup is 2 nodes), `COPY` with full dataset will take time too long and cause an `ABORTED`.

In [None]:

import configparser
import psycopg2


def load_staging_tables(cur, conn):

    staging_songs_copy = (
        """
    COPY staging_songs from '{}'
    credentials 'aws_iam_role={}'
    FORMAT AS JSON 'auto'
    REGION 'us-west-2'
    """
    )

    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()

    az = "ABCDEFGHIJKLMNOPQRZTUVWXYZ"
    for char in az:
        s3_path = "s3://udacity-dend/song_data/A/{}".format(char)
        query = staging_songs_copy.format(s3_path,'arn:aws:iam::975113008954:role/dwhRole' )
        print(query)
        cur.execute(query)
        conn.commit()
        print(f"...{char}")
    
    conn.close()



load_staging_tables()


### Test Query Staging_events


In [None]:
%sql SELECT * FROM staging_events limit 10; 

In [None]:
%sql SELECT COUNT(*) FROM staging_events; 

### Test Query Staging_songs

In [None]:

%sql SELECT * FROM staging_songs limit 10; 

In [None]:
%sql SELECT COUNT(*) FROM staging_songs;


#### Test Query

In [None]:
%sql SELECT COUNT(*) FROM songplays

In [None]:

%sql SELECT COUNT(*) FROM users

In [None]:

%sql SELECT COUNT(*) FROM songs

In [None]:

%sql SELECT COUNT(*) FROM artists

In [None]:
%sql SELECT COUNT(*) FROM time

---
### [Optional] Manual `INSERT`

#### INSERT songplays

In [None]:
%%sql 
INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT e.ts, e.userId, e.level, s.song_id, s.artist_id, e.sessionId, e.location, e.userAgent
FROM staging_events as e
JOIN staging_songs as s
    ON e.song = s.title
    AND e.artist = s.artist_name

#### INSERT users

In [None]:
%%sql 
INSERT INTO users (user_id, first_name, last_name, gender, level)
SELECT DISTINCT userId, firstName, lastName, gender, level
FROM staging_events
WHERE userId is NOT NULL

#### INSERT songs

In [None]:

%%sql 
INSERT INTO songs (song_id, title, artist_id, year, duration)
SELECT DISTINCT song_id, title, artist_id, year, duration
FROM staging_songs
WHERE song_id is NOT NULL

#### INSERT artists

In [None]:

%%sql 
INSERT INTO artists (artist_id, name, location, latitude, longitude)
SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude
FROM staging_songs
WHERE artist_id is NOT NULL

#### INSERT time

In [None]:

%%sql 
INSERT INTO time (start_time, hour, day, week, year, weekday)
SELECT DISTINCT ts, EXTRACT(hour from ts), EXTRACT(day from ts), EXTRACT(week from ts), EXTRACT(year from ts), EXTRACT(weekday from ts)
FROM staging_events
WHERE ts is NOT NULL