# ETL

Steps to create data warehouse for analytic process of Sparkify song plays data.
- STEP 0: Activate S3 service and check data on buckets.
- STEP 1: Extract data from S3 to Redshift as Staging tables.
- STEP 2: Transform and Load data into Analytics tables.
- STEP 3: Explore analytic queries.

<img src="images/etl_step.png" width="90%"/>

In [None]:
import pandas as pd
import boto3
import json
import configparser
import pandas as pd
from humanize import naturalsize
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
%load_ext sql

# STEP 0: Activate S3 service and check data on buckets

## 0.1 Activate S3

In [None]:
KEY                = config.get('AWS','KEY')
SECRET             = config.get('AWS','SECRET')
BUCKET_NAME        = config.get("S3", "BUCKET_NAME")

In [None]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

## 0.2 Check data on Bucket 

In [None]:
def bucketSummary(bucket_name, prefix):
    #Put s3 obj into list then find length and sum of size in that list
    obj_list = list(s3.Bucket(bucket_name).objects.filter(Prefix=prefix))
    total_files = len(obj_list)
    total_size = naturalsize(sum(obj.size for obj in obj_list))
    print(f"Total_files: {total_files}\nTotal size: {total_size}")

    #select one file path to show in dataframe (Pip install s3fs)
    sample_file_path = f"s3://{bucket_name}/{obj_list[1].key}"
    df = pd.read_json(sample_file_path,  lines=True)
    print("Sample file path: ",sample_file_path )
    display(df.head(2))

In [None]:
bucketSummary(BUCKET_NAME,prefix='log_data')

In [None]:
bucketSummary(BUCKET_NAME,prefix='song_data')

#  
# STEP 1: Import data from S3 to Redshift as Staging tables

## 1.1 Connect to Database

In [None]:
DB_NAME            = config.get("CLUSTER","DB_NAME")
DB_USER            = config.get("CLUSTER","DB_USER")
DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DB_PORT            = config.get("CLUSTER","DB_PORT")
HOST               = config.get("CLUSTER","HOST")

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, HOST, DB_PORT,DB_NAME)
print(conn_string)
%sql $conn_string

In [None]:
%%sql 
DROP TABLE IF EXISTS "staging_events";
DROP TABLE IF EXISTS "staging_songs";

## 1.2 Create staging tables

In [None]:
%%sql 
DROP TABLE IF EXISTS "staging_events";
DROP TABLE IF EXISTS "staging_songs";

CREATE TABLE "staging_events" (
                event_id      INT IDENTITY(0,1)    NOT NULL,
                artist        VARCHAR              NULL,
                auth          VARCHAR              NULL,
                firstName     VARCHAR              NULL,
                gender        CHAR (1)             NULL,
                itemInSession VARCHAR              NULL,
                lastName      VARCHAR              NULL,
                length        NUMERIC              NULL,
                level         VARCHAR              NULL,
                location      VARCHAR              NULL,
                method        VARCHAR              NULL,
                page          VARCHAR              NULL,
                registration  NUMERIC              NULL,
                sessionId     INTEGER              NOT NULL SORTKEY DISTKEY,
                song          VARCHAR              NULL,
                status        INTEGER              NULL,
                ts            NUMERIC              NOT NULL,
                userAgent     VARCHAR              NULL,
                userId        INTEGER              NULL

);

CREATE TABLE IF NOT EXISTS "staging_songs" (
                num_songs           INTEGER         NULL,
                artist_id           VARCHAR         NOT NULL SORTKEY DISTKEY,
                artist_latitude     VARCHAR         NULL,
                artist_longitude    VARCHAR         NULL,
                artist_location     VARCHAR         NULL,
                artist_name         VARCHAR         NULL,
                song_id             VARCHAR         NOT NULL,
                title               VARCHAR         NULL,
                duration            DECIMAL         NULL,
                year                INTEGER         NULL
);

## 1.3 Copy data from S3 to staging table

In [None]:
IAM_ROLE_ARN=config.get("IAM_ROLE","IAM_ROLE_ARN")
LOG_DATA    =config.get("S3","LOG_DATA")
SONG_DATA   =config.get("S3","SONG_DATA")
LOG_JSONPATH=config.get("S3","LOG_JSONPATH")

#### Import log events data

In [None]:
%%time

qry = """
    COPY staging_events FROM {}
    CREDENTIALS 'aws_iam_role={}' 
    FORMAT as json {}
    region 'us-west-2';
""".format(LOG_DATA, IAM_ROLE_ARN, LOG_JSONPATH)

%sql $qry

#### Import song data

In [None]:
%%time

qry = """
    COPY staging_songs FROM {}
    credentials 'aws_iam_role={}'
    format as json 'auto'
    region 'us-west-2';
""".format(SONG_DATA, IAM_ROLE_ARN)

%sql $qry

## 1.4 Check imported data

#### Check data on log event staging table

In [None]:
%%sql 
SELECT COUNT(*)
FROM "staging_events";

In [None]:
%%sql 
SELECT *
FROM "staging_events"
ORDER BY "event_id"
LIMIT 2

#### Check data on songs staging table

In [None]:
%%sql 
SELECT COUNT(*)
FROM "staging_songs";

In [None]:
%%sql 
SELECT *
FROM "staging_songs"
LIMIT 2

#  

# STEP 2. Transform and Load data into Analytics tables.

## 2.1 Create Analytics tables

In [None]:
%%sql 

DROP TABLE IF EXISTS "songplays";
DROP TABLE IF EXISTS "users";
DROP TABLE IF EXISTS "songs";
DROP TABLE IF EXISTS "artists";
DROP TABLE IF EXISTS "time";

CREATE TABLE IF NOT EXISTS "songplays" (
                songplay_id INTEGER IDENTITY(0,1)   NOT NULL SORTKEY,
                start_time  TIMESTAMP               NOT NULL,
                user_id     VARCHAR(50)             NOT NULL DISTKEY,
                level       VARCHAR(10)             NOT NULL,
                song_id     VARCHAR(40)             NOT NULL,
                artist_id   VARCHAR(50)             NOT NULL,
                session_id  VARCHAR(50)             NOT NULL,
                location    VARCHAR(100)            NULL,
                user_agent  VARCHAR(255)            NULL
                );

CREATE TABLE IF NOT EXISTS "users" (
                user_id     INTEGER                 NOT NULL SORTKEY,
                first_name  VARCHAR(50)             NULL,
                last_name   VARCHAR(80)             NULL,
                gender      VARCHAR(10)             NULL,
                level       VARCHAR(10)             NULL
                ) diststyle all;

CREATE TABLE IF NOT EXISTS "songs" (
                song_id     VARCHAR(50)             NOT NULL SORTKEY,
                title       VARCHAR(500)            NOT NULL,
                artist_id   VARCHAR(50)             NOT NULL,
                year        INTEGER                 NOT NULL,
                duration    DECIMAL(9)              NOT NULL
                );

CREATE TABLE IF NOT EXISTS "artists" (
                artist_id   VARCHAR(50)             NOT NULL SORTKEY,
                name        VARCHAR(500)            NULL,
                location    VARCHAR(500)            NULL,
                latitude    DECIMAL(9)              NULL,
                longitude   DECIMAL(9)              NULL
                ) diststyle all;


CREATE TABLE IF NOT EXISTS "time" (
                start_time  TIMESTAMP               NOT NULL SORTKEY,
                hour        SMALLINT                NULL,
                day         SMALLINT                NULL,
                week        SMALLINT                NULL,
                month       SMALLINT                NULL,
                year        SMALLINT                NULL,
                weekday     SMALLINT                NULL
                ) diststyle all;

## 2.2 Insert data

In [None]:
%%sql 

INSERT INTO "songplays" (start_time,user_id,level,song_id,artist_id,session_id,location,user_agent)
SELECT  DISTINCT TIMESTAMP 'epoch' + se.ts/1000* INTERVAL '1 second'   AS start_time,
        se.userId                   AS user_id,
        se.level                    AS level,
        ss.song_id                  AS song_id,
        ss.artist_id                AS artist_id,
        se.sessionId                AS session_id,
        se.location                 AS location,
        se.userAgent                AS user_agent
FROM "staging_events" AS se
JOIN "staging_songs" AS ss ON (se.artist = ss.artist_name)
WHERE se.page = 'NextSong';

    
INSERT INTO "users" (user_id,first_name,last_name,gender,level)
SELECT  DISTINCT se.userId          AS user_id,
        se.firstName                AS first_name,
        se.lastName                 AS last_name,
        se.gender                   AS gender,
        se.level                    AS level
FROM "staging_events" AS se
WHERE se.page = 'NextSong';


INSERT INTO "songs" (song_id,title,artist_id,year,duration)
SELECT  DISTINCT ss.song_id         AS song_id,
        ss.title                    AS title,
        ss.artist_id                AS artist_id,
        ss.year                     AS year,
        ss.duration                 AS duration
FROM "staging_songs" AS ss;


INSERT INTO "artists" (artist_id,name,location,latitude,longitude)
SELECT  DISTINCT ss.artist_id       AS artist_id,
        ss.artist_name              AS name,
        ss.artist_location          AS location,
        ss.artist_latitude          AS latitude,
        ss.artist_longitude         AS longitude
FROM "staging_songs" AS ss;


INSERT INTO "time" (start_time,hour,day,week,month,year,weekday)
SELECT  DISTINCT TIMESTAMP 'epoch' + se.ts/1000* INTERVAL '1 second' AS start_time,
        EXTRACT(hour FROM start_time)    AS hour,
        EXTRACT(day FROM start_time)     AS day,
        EXTRACT(week FROM start_time)    AS week,
        EXTRACT(month FROM start_time)   AS month,
        EXTRACT(year FROM start_time)    AS year,
        EXTRACT(weekday FROM start_time) AS weekday
FROM  "staging_events" AS se
WHERE se.page = 'NextSong';

## 2.3 Check inserted data

In [None]:
%%sql 
SELECT * FROM "songplays" LIMIT 1;

In [None]:
%%sql 
SELECT * FROM "users" LIMIT 1;

In [None]:
%%sql 
SELECT * FROM "songs" LIMIT 1;

In [None]:
%%sql 
SELECT * FROM "artists" LIMIT 1;

In [None]:
%%sql 
SELECT * FROM "time" LIMIT 1;

# STEP 3. Analytic queries.

#### Top 10 Songs by number of plays.

In [None]:
%%sql 

SELECT s.title as top_song, a.name as artist, COUNT(sp.songplay_id) AS play_times
FROM "songplays" AS sp
JOIN "songs" AS s on sp.song_id = s.song_id
JOIN "artists" As a on sp.artist_id = a.artist_id
GROUP BY s.title, a.name
ORDER BY play_times DESC
LIMIT 10

#### Top 10 Artists by number of song plays.

In [None]:
%%sql 

SELECT a.name AS top_artist, COUNT(sp.songplay_id) AS song_plays
FROM "songplays" AS sp
JOIN "artists" As a on sp.artist_id = a.artist_id
GROUP BY a.name
ORDER BY song_plays DESC
LIMIT 10