# ETL FOR 1 TESTING FILE

In [1]:
%load_ext sql

In [2]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time
import os

## 1. Get the params of the created redshift cluster 

- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [3]:
config = configparser.ConfigParser()
config.read_file(open(os.getcwd()+'/test_dwh.cfg'))

KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

DWH_ENDPOINT = config.get("DWH","DWH_ENDPOINT")
DWH_ROLE_ARN = config.get("DWH","DWH_ROLE_ARN")

## 2. Connect to the Redshift Cluster

In [4]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
%sql $conn_string

'Connected: dwhuser@dwh'

## 3. Create Tables

## Schema for Song Play Analysis:
Using the song and event datasets, you'll need to create a star schema optimized for queries on song play analysis. This includes the following tables.

**The primary benefit of a star schema is its simplicity for users to write, and databases to process: queries are written with simple inner joins between the facts and a small number of dimensions.**

### Fact Table
- **songplays** - records in log data associated with song plays i.e. records with page NextSong: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent. Note: Upsert on level attribure.

### Dimension Tables
- **users** users in the app: user_id, first_name, last_name, gender, level. Note: Upsert on level attribure.
- **songs** songs in music database: song_id, title, artist_id, year, duration
- **artists** artists in music database: artist_id, name, location, latitude, longitude
- **time** timestamps of records in songplays broken down into specific units: start_time, hour, day, week, month, year, weekday

### 3.1 Staging tables:
- Load data from S3 to staging tables on Redshift.
- There are 2 staging tables (more detail from  first look datasets):
    + song dataset.
    + log dataset.

#### 3.1.1 Song Dataset:

In [5]:
%%sql
CREATE SCHEMA IF NOT EXISTS staging;
SET search_path TO staging;

DROP TABLE IF EXISTS song_dataset;
DROP TABLE IF EXISTS log_dataset;
DROP TABLE IF EXISTS raw_fact_table;

CREATE TABLE IF NOT EXISTS song_dataset 
(
    artist_id varchar,
    artist_latitude varchar,
    artist_location varchar,
    artist_longitude varchar,
    artist_name varchar,
    duration varchar,
    num_songs varchar,
    song_id varchar,
    title varchar,
    year varchar
);

CREATE TABLE IF NOT EXISTS log_dataset 
(
    artist varchar,
    auth varchar,
    firstname varchar,
    gender varchar,
    iteminsession varchar,
    lastname varchar,
    length varchar,
    level varchar,
    location varchar,
    method varchar,
    page varchar,
    registration varchar,
    sessionid varchar,
    song varchar,
    status varchar,
    ts varchar,
    useragent varchar,
    userid varchar
);

CREATE TABLE IF NOT EXISTS raw_fact_table 
(
    artist varchar,
    auth varchar,
    firstname varchar,
    gender varchar,
    iteminsession varchar,
    lastname varchar,
    length varchar,
    level varchar,
    location varchar,
    method varchar,
    page varchar,
    registration varchar,
    sessionid varchar,
    song varchar,
    status varchar,
    ts varchar,
    useragent varchar,
    userid varchar,
    artist_id varchar,
    song_id varchar
);


 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

#### 3.1.2 Loading single json file into Song Dataset:

In [6]:
%%time
qry = """
    copy song_dataset from 's3://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json'
    credentials 'aws_iam_role={}'
    json 'auto'
    region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 1.36 ms, sys: 3.47 ms, total: 4.83 ms
Wall time: 4 s


#### 3.1.3 Loading single json file into Log Dataset:

In [7]:
%%time
qry = """
    copy log_dataset from 's3://udacity-dend/log_data/2018/11/2018-11-01-events.json'
    credentials 'aws_iam_role={}'
    json 's3://udacity-dend/log_json_path.json'
    region 'us-west-2';
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.94 ms, sys: 173 µs, total: 5.11 ms
Wall time: 4.79 s


In [None]:
%%sql



### 3.2 Analytic tables:

In [10]:
%%sql
CREATE SCHEMA IF NOT EXISTS analyst_nodist;
SET search_path TO analyst_nodist;

DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;
DROP TABLE IF EXISTS time;
DROP TABLE IF EXISTS songplays;


CREATE TABLE IF NOT EXISTS users 
(
    user_id varchar,
    first_name varchar,
    last_name varchar,
    gender varchar,
    level varchar,
    PRIMARY KEY (user_id)
);

CREATE TABLE IF NOT EXISTS songs (
    song_id varchar, 
    title varchar, 
    artist_id varchar, 
    year int, 
    duration real, 
    PRIMARY KEY (song_id) 
);

CREATE TABLE IF NOT EXISTS artists (
    artist_id varchar, 
    name varchar, 
    location varchar, 
    latitude real, 
    longitude real, 
    PRIMARY KEY (artist_id) 
);

CREATE TABLE IF NOT EXISTS time (
    start_time varchar, 
    hour varchar, 
    day varchar, -- Day of month
    week varchar, -- Week of year
    month varchar, 
    year varchar, 
    weekday varchar, 
    PRIMARY KEY (start_time) 
);

CREATE TABLE IF NOT EXISTS songplays (
    songplay_id bigint IDENTITY(0,1), 
    start_time varchar, 
    user_id varchar, 
    level varchar, 
    song_id varchar, 
    artist_id varchar, 
    session_id varchar, 
    location varchar, 
    user_agent varchar,
    PRIMARY KEY (songplay_id) 
);

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

### 3.3 ETL from staging tables into analytic tables:

#### 3.3.1 Users table

In [9]:
%%sql
BEGIN;

--  insert users table having duplicate row
INSERT into analyst_nodist.users (user_id, first_name, last_name, gender, level)
SELECT userId, firstName, lastName, gender, level
FROM staging.log_dataset
WHERE userId IS NOT NULL;

-- Star remove the duplicate rows
-- First identify the rows that are duplicate
CREATE TEMP TABLE duplicate_user_id AS
SELECT user_id
FROM analyst_nodist.users
GROUP BY user_id
HAVING COUNT(*) > 1;

-- Extract one copy of all the duplicate rows
CREATE TEMP TABLE new_users(LIKE analyst_nodist.users);

INSERT into new_users
SELECT DISTINCT *
FROM analyst_nodist.users
WHERE user_id IN(
     SELECT user_id
     FROM duplicate_user_id
);

-- Remove rows that were duplicated (all copies).
DELETE FROM analyst_nodist.users
WHERE user_id IN(
     SELECT user_id
     FROM duplicate_user_id
);

-- Insert back into the single copies
INSERT into analyst_nodist.users
SELECT *
FROM new_users;

-- Cleanup
DROP TABLE duplicate_user_id;
DROP TABLE new_users;
COMMIT;

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
15 rows affected.
Done.
Done.
2 rows affected.
12 rows affected.
2 rows affected.
Done.
Done.
Done.


[]

### 3..3.2 songs table

In [22]:
%%sql
BEGIN;

--  insert songs table MAY have duplicate row
INSERT into analyst_nodist.songs (song_id, title, artist_id, year, duration) 
SELECT song_id, title, artist_id, convert(integer, year), convert(real, duration)
FROM staging.song_dataset;

-- Star remove the duplicate rows
-- First identify the rows that are duplicate
CREATE TEMP TABLE duplicate_song_id AS
SELECT song_id
FROM analyst_nodist.songs
GROUP BY song_id
HAVING COUNT(*) > 1;

-- Extract one copy of all the duplicate rows
CREATE TEMP TABLE new_songs(LIKE analyst_nodist.songs);

INSERT into new_songs
SELECT DISTINCT *
FROM analyst_nodist.songs
WHERE song_id IN(
     SELECT song_id
     FROM duplicate_song_id
);

-- Remove rows that were duplicated (all copies).
DELETE FROM analyst_nodist.songs
WHERE song_id IN(
     SELECT song_id
     FROM duplicate_song_id
);

-- Insert back into the single copies
INSERT into analyst_nodist.songs
SELECT *
FROM new_songs;

-- Cleanup
DROP TABLE duplicate_song_id;
DROP TABLE new_songs;

COMMIT;

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
1 rows affected.
Done.
Done.
0 rows affected.
0 rows affected.
0 rows affected.
Done.
Done.
Done.


[]

### 3.3.3 Artists table:

In [None]:
%%sql
BEGIN;

--  insert artists table MAY have duplicate row
INSERT into analyst_nodist.artists (artist_id, name, location, latitude, longitude) 
SELECT artist_id, artist_name, artist_location, convert(real, artist_latitude), convert(real, artist_longitude)
FROM staging.song_dataset;

-- Star remove the duplicate rows
-- First identify the rows that are duplicate
CREATE TEMP TABLE duplicate_artist_id AS
SELECT artist_id
FROM analyst_nodist.artists
GROUP BY artist_id
HAVING COUNT(*) > 1;

-- Extract one copy of all the duplicate rows
CREATE TEMP TABLE new_artists(LIKE analyst_nodist.artists);

INSERT into new_artists
SELECT DISTINCT *
FROM analyst_nodist.songs
WHERE artist_id IN(
     SELECT artist_id
     FROM duplicate_artist_id
);

-- Remove rows that were duplicated (all copies).
DELETE FROM analyst_nodist.artists
WHERE artist_id IN(
     SELECT artist_id
     FROM duplicate_artist_id
);

-- Insert back into the single copies
INSERT into analyst_nodist.artists
SELECT *
FROM new_artists;

-- Cleanup
DROP TABLE duplicate_artist_id;
DROP TABLE new_artists;

COMMIT;

### 3.3.4 Time table:

In [73]:
%%sql
BEGIN;

-- Create TEMP TABLE temp_timestamp to convert ts epoch to timestamp
DROP TABLE IF EXISTS temp_timestamp;

CREATE TEMP TABLE temp_timestamp AS
SELECT timestamp with time zone 'epoch' + convert(BIGINT, ts)/1000 * interval '1 second' AS timestamp
FROM staging.log_dataset;

--  insert artists table MAY have duplicate row
INSERT into analyst_nodist.time (start_time, hour, day, week, month, year, weekday) 
SELECT stag.ts,
to_char (temp.timestamp, 'HH24'),
to_char (temp.timestamp, 'DD'),
to_char (temp.timestamp, 'WW'),
to_char (temp.timestamp, 'MM'),
to_char (temp.timestamp, 'YYYY'),
to_char (temp.timestamp, 'Day')
FROM staging.log_dataset stag, temp_timestamp temp;

-- Star remove the duplicate rows
-- First identify the rows that are duplicate
CREATE TEMP TABLE duplicate_start_time AS
SELECT start_time
FROM analyst_nodist.time
GROUP BY start_time
HAVING COUNT(*) > 1;

-- Extract one copy of all the duplicate rows
CREATE TEMP TABLE new_time(LIKE analyst_nodist.time);

INSERT into new_time
SELECT DISTINCT *
FROM analyst_nodist.time
WHERE start_time IN(
     SELECT start_time
     FROM duplicate_start_time
);

-- Remove rows that were duplicated (all copies).
DELETE FROM analyst_nodist.time
WHERE start_time IN(
     SELECT start_time
     FROM duplicate_start_time
);

-- Insert back into the single copies
INSERT into analyst_nodist.time
SELECT *
FROM new_time;

-- Cleanup
DROP TABLE duplicate_start_time;
DROP TABLE new_time;
DROP TABLE temp_timestamp;

COMMIT;

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
225 rows affected.
Done.
Done.
42 rows affected.
450 rows affected.
42 rows affected.
Done.
Done.
Done.
Done.


[]

### 3.3.5 songplays table:

In [14]:
%%sql
BEGIN;

INSERT into songplays( 
    start_time, user_id, level,
    song_id, artist_id,
    session_id, location, user_agent)
SELECT log_dataset.ts, log_dataset.userId, log_dataset.level,
song_dataset.song_id, song_dataset.artist_id,
log_dataset.sessionId, log_dataset.location, log_dataset.userAgent
FROM staging.log_dataset log_dataset LEFT JOIN staging.song_dataset song_dataset
ON log_dataset.artist = song_dataset.artist_name
AND log_dataset.song =  song_dataset.title
AND log_dataset.length =  song_dataset.duration
WHERE log_dataset.page = 'NextSong';

COMMIT;

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
11 rows affected.
Done.


[]

### 3.2 Automate  the copying

- Make sure that the `DWH_ROLE_ARN` is substituted with the correct value in each query
- Perform the data loading twice once for each schema (dist and nodist)
- Collect timing statistics to compare the insertion times

We have scripted the insertion as found below in the function `loadTables` which
returns a pandas dataframe containing timing statistics for the copy operations

```sql
copy song_dataset from 's3://udacity-dend/song_data' 
credentials 'aws_iam_role=<DWH_ROLE_ARN>'
json 'auto'
gzip region 'us-west-2';

```

In [15]:
def loadTables(schema, tables):
    loadTimes = []
    SQL_SET_SCEMA = "SET search_path TO {};".format(schema)
    %sql $SQL_SET_SCEMA
    
    for table in tables:
        SQL_COPY = """
copy {} from 's3://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json'
credentials 'aws_iam_role={}'
JSON 'auto'
region 'us-west-2';
        """.format(table, DWH_ROLE_ARN)

        print("======= LOADING TABLE: ** {} ** IN SCHEMA ==> {} =======".format(table, schema))
        print(SQL_COPY)

        t0 = time()
        %sql $SQL_COPY
        loadTime = time()-t0
        loadTimes.append(loadTime)

        print("=== DONE IN: {0:.2f} sec\n".format(loadTime))
    return pd.DataFrame({"table":tables, "loadtime_"+schema:loadTimes}).set_index('table')

In [10]:
#-- List of the tables to be loaded
tables = ["song_dataset"]

In [16]:
staging = loadTables("staging", tables)

 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.

copy song_dataset from 's3://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json'
credentials 'aws_iam_role=arn:aws:iam::668674852354:role/dwhRole'
JSON 'auto'
region 'us-west-2';
        
 * postgresql://dwhuser:***@dwhcluster.cl33boinwx8d.us-east-1.redshift.amazonaws.com:5439/dwh
Done.
=== DONE IN: 14.11 sec

