# Parallel ETL

In [2]:
from time import time
import psycopg2
import configparser
import matplotlib.pyplot as plt
import pandas as pd

## STEP 1: Get the params of the created redshift cluster 

In [3]:
config = configparser.ConfigParser()
config.read_file(open('dwh_crshc.cfg'))
KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

In [4]:
# FILL IN THE REDSHIFT ENPOINT HERE
# e.g. DWH_ENDPOINT="redshift-cluster-1.csmamz5zxmle.us-west-2.redshift.amazonaws.com" 
DWH_ENDPOINT="dwhcluster.ckoszxyq4rgx.us-west-2.redshift.amazonaws.com" 
    
#FILL IN THE IAM ROLE ARN you got in step 2.2 of the previous exercise
#e.g DWH_ROLE_ARN="arn:aws:iam::988332130976:role/dwhRole"
#DWH_ROLE_ARN="arn:aws:iam::220621774524:role/dwhRole"
DWH_ROLE_ARN= config.get("IAM_ROLE","ARN")

## STEP 2: Connect to the Redshift Cluster

In [5]:
%%time
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(DWH_ENDPOINT, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT))
cur = conn.cursor()
print(conn)

<connection object at 0x7fcd854122a8; dsn: 'user=dwhuser password=xxx dbname=dwh host=dwhcluster.ckoszxyq4rgx.us-west-2.redshift.amazonaws.com port=5439', closed: 0>
CPU times: user 1.66 ms, sys: 3.62 ms, total: 5.27 ms
Wall time: 431 ms


In [5]:
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="log"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

## STEP 3: Create Staging Tables

In [64]:
%%time
cur.execute("""
DROP TABLE IF EXISTS "staging_logs"; 
DROP TABLE IF EXISTS "staging_songs"; 
CREATE TABLE "staging_logs" ("artist" varchar,
                             "auth" varchar,
                             "firstName" varchar,
                             "gender" varchar,
                             "itemInSession" varchar,
                             "lastName" varchar,
                             "length" float,
                             "level" varchar,
                             "location" varchar,
                             "method" varchar,
                             "page" varchar,
                             "registration" float,
                             "sessionId" int,
                             "song" varchar,
                             "status" varchar,
                             "ts" bigint,
                             "userAgent" varchar,
                             "userId" int);


CREATE TABLE IF NOT EXISTS "staging_songs" ("artist_id" varchar,
                                            "artist_latitude" real,
                                            "artist_location" varchar,
                                            "artist_longitude" real,
                                            "artist_name" varchar,
                                            "duration" float,
                                            "num_songs" int,
                                            "song_id" varchar,
                                            "title" varchar,
                                            "year" int);
""");

CPU times: user 0 ns, sys: 1.38 ms, total: 1.38 ms
Wall time: 126 ms


## STEP 4: Copy S3 Data To Staging

In [223]:
%%time
cur.execute("""
copy staging_logs 
from 's3://udacity-dend/log_data' 
credentials 'aws_iam_role={}'
format as json 's3://udacity-dend/log_json_path.json'
compupdate off
emptyasnull
blanksasnull
region 'us-west-2';
""".format(DWH_ROLE_ARN))

CPU times: user 1.41 ms, sys: 233 µs, total: 1.64 ms
Wall time: 629 ms


In [224]:
%%time
cur.execute("""
copy staging_songs 
from 's3://udacity-dend/song_data' 
credentials 'aws_iam_role={}'
format as json 'auto'
compupdate off
emptyasnull
blanksasnull
region 'us-west-2';
""".format(DWH_ROLE_ARN))

CPU times: user 1.83 ms, sys: 303 µs, total: 2.14 ms
Wall time: 1min 36s


In [225]:
cur.execute("""DELETE FROM staging_logs WHERE userId IS NULL;""")
cur.execute("""DELETE FROM staging_logs WHERE sessionId IS NULL;""")
cur.execute("""DELETE FROM staging_logs WHERE ts IS NULL;""")
cur.execute("""DELETE FROM staging_songs WHERE song_id IS NULL;""")
cur.execute("""DELETE FROM staging_songs WHERE artist_id IS NULL;""")
conn.commit()

## STEP 5: Create Tables for Star Schema DB

In [55]:
%%time
cur.execute("""
DROP TABLE IF EXISTS "songplays";
DROP TABLE IF EXISTS "users";
DROP TABLE IF EXISTS "songs";
DROP TABLE IF EXISTS "artist";
DROP TABLE IF EXISTS "time";

CREATE TABLE IF NOT EXISTS "users" ("user_id" int NOT NULL,
                                    "first_name" varchar,
                                    "last_name" varchar,
                                    "gender" varchar(1),
                                    "level" varchar(4),
                                    PRIMARY KEY ("user_id"));

CREATE TABLE IF NOT EXISTS "songs" ("song_id" varchar NOT NULL,
                                    "title" varchar,
                                    "artist_id" varchar,
                                    "year" int,
                                    "duration" float,
                                    PRIMARY KEY ("song_id"));

CREATE TABLE IF NOT EXISTS "artists" ("artist_id" varchar NOT NULL,
                                      "name" varchar,
                                      "location" varchar,
                                      "latitude" real,
                                      "longitude" real,
                                      PRIMARY KEY ("artist_id"));

CREATE TABLE IF NOT EXISTS "time" ("start_time" timestamp NOT NULL,
                                   "hour" int,
                                   "day" int,
                                   "week" int,
                                   "month" int,
                                   "year" int,
                                   "weekday" int,
                                   PRIMARY KEY ("start_time"));

CREATE TABLE IF NOT EXISTS songplays (songplay_id bigint NOT NULL, 
                                        start_time timestamp NOT NULL,
                                        user_id int NOT NULL,
                                        level varchar(4) NOT NULL,
                                        song_id varchar,
                                        artist_id varchar,
                                        session_id int NOT NULL,
                                        location varchar NOT NULL,
                                        user_agent varchar NOT NULL,
                                        PRIMARY KEY (songplay_id),
                                        FOREIGN KEY (song_id) REFERENCES songs (song_id),
                                        FOREIGN KEY (artist_id) REFERENCES artists (artist_id),
                                        FOREIGN KEY (start_time) REFERENCES time (start_time),
                                        FOREIGN KEY (user_id) REFERENCES users (user_id),
                                        UNIQUE (start_time, user_id, session_id));
""")
conn.commit()

CPU times: user 921 µs, sys: 141 µs, total: 1.06 ms
Wall time: 258 ms


# STEP 5: Load data into the cluster

In [56]:
%%time
#Load Users Table
cur.execute("""
CREATE TABLE IF NOT EXISTS "users_staging" ("user_id" int NOT NULL,
                                    "first_name" varchar,
                                    "last_name" varchar,
                                    "gender" varchar(1),
                                    "level" varchar(4),
                                    "start_time" bigint,
                                    PRIMARY KEY ("user_id"));

INSERT INTO users_staging 
    SELECT DISTINCT 
        CAST(userId AS int) AS user_id, 
        firstName AS first_name, 
        lastName AS last_name, 
        gender, 
        level,
        ts AS "start_time"
FROM staging_logs;

INSERT INTO users 
    SELECT st.user_id, st.first_name, st.last_name, st.gender, st.level
FROM (SELECT st.*,
             ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY start_time DESC) seqnum
      FROM users_staging st) st
LEFT JOIN users u
ON st.user_id = u.user_id
WHERE seqnum = 1 AND u.user_id IS NULL;

UPDATE users
SET level = st.level
FROM (SELECT st.*,
                ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY start_time DESC) seqnum
        FROM users_staging st) st
JOIN users u
ON st.user_id = u.user_id
WHERE seqnum = 1 
AND (u.user_id = st.user_id AND u.first_name = st.first_name AND u.last_name = st.last_name AND u.gender = st.gender AND u.level <> st.level);
""")
conn.commit()

CPU times: user 869 µs, sys: 134 µs, total: 1 ms
Wall time: 760 ms


In [57]:
%%time
#Load Song Table
cur.execute("""
CREATE TEMP TABLE songs_staging (LIKE songs);

INSERT INTO songs_staging 
    SELECT DISTINCT song_id, title, artist_id, year, duration
FROM staging_songs;

INSERT INTO songs 
    SELECT st.song_id, st.title, st.artist_id, st.year, st.duration
FROM (SELECT st.*,
             ROW_NUMBER() OVER (PARTITION BY song_id, title, artist_id, year, duration ORDER BY song_id, title, artist_id, year, duration DESC) seqnum
      FROM songs_staging st) st
LEFT JOIN songs s
ON st.song_id = s.song_id
WHERE seqnum = 1 AND s.song_id IS NULL;   

DROP TABLE songs_staging;
""")
conn.commit()

CPU times: user 1.55 ms, sys: 239 µs, total: 1.79 ms
Wall time: 550 ms


In [58]:
%%time
#Load Artists Table
cur.execute("""
CREATE TEMP TABLE artists_staging (LIKE artists);

INSERT INTO artists_staging 
    SELECT DISTINCT 
        artist_id, 
        artist_name AS name, 
        artist_location AS location, 
        artist_latitude AS latitude, 
        artist_longitude AS longitude
FROM staging_songs;

INSERT INTO artists 
    SELECT st.artist_id, st.name, st.location, st.latitude, st.longitude
FROM (SELECT st.*,
             ROW_NUMBER() OVER (PARTITION BY artist_id, name, location, latitude, longitude ORDER BY artist_id, name, location, latitude, longitude DESC) seqnum
      FROM artists_staging st) st
LEFT JOIN artists a
ON st.artist_id = a.artist_id
WHERE seqnum = 1 AND a.artist_id IS NULL;    

DROP TABLE artists_staging;
""")
conn.commit()

CPU times: user 1.67 ms, sys: 0 ns, total: 1.67 ms
Wall time: 388 ms


In [59]:
%%time
#Load Time Table
cur.execute("""
CREATE TEMP TABLE time_staging (LIKE time);

INSERT INTO time_staging 
    SELECT DISTINCT 
        DATEADD(ms, ts,'1970-1-1') AS start_time, 
        DATE_PART(hour, DATEADD(ms, ts,'1970-1-1')) AS hour, 
        DATE_PART(day, DATEADD(ms, ts,'1970-1-1')) AS day, 
        DATE_PART(week, DATEADD(ms, ts,'1970-1-1')) AS week, 
        DATE_PART(month, DATEADD(ms, ts,'1970-1-1')) AS month, 
        DATE_PART(year, DATEADD(ms, ts,'1970-1-1')) AS year, 
        DATE_PART(weekday, DATEADD(ms, ts,'1970-1-1')) AS weekday
FROM staging_logs;

INSERT INTO time 
    SELECT st.start_time, st.hour, st.week, st.month, st.year, st.weekday
FROM (SELECT st.*,
             ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY start_time DESC) seqnum
      FROM time_staging st) st
LEFT JOIN time t
ON st.start_time = t.start_time
WHERE seqnum = 1 AND t.start_time IS NULL;

DROP TABLE time_staging;
""")
conn.commit()

CPU times: user 0 ns, sys: 1.95 ms, total: 1.95 ms
Wall time: 569 ms


In [60]:
%%time
#Load Songplays Table
cur.execute("""
CREATE TEMP TABLE songplays_staging (songplay_id bigint NOT NULL, 
                                        start_time timestamp NOT NULL,
                                        user_id int NOT NULL,
                                        level varchar(4) NOT NULL,
                                        song_id varchar,
                                        artist_id varchar,
                                        session_id int NOT NULL,
                                        location varchar NOT NULL,
                                        user_agent varchar NOT NULL,
                                        song varchar,
                                        artist varchar,
                                        length float,
                                        ts bigint,
                                        concat_id varchar NOT NULL);

CREATE TEMP TABLE songplays_staging1 (song_id varchar NOT NULL,
                                        song varchar NOT NULL,
                                        artist_id varchar NOT NULL,
                                        artist varchar NOT NULL,
                                        length float NOT NULL,
                                        concat_id varchar NOT NULL);
                                        
INSERT INTO songplays_staging
SELECT DISTINCT
    1, 
    DATEADD(ms, ts,'1970-1-1') AS start_time, 
    userid AS user_id, 
    level, 
    'None', 
    'None', 
    sessionid AS session_id, 
    location, 
    useragent AS user_agent,
    song,
    artist,
    length,
    ts,
    'None'
FROM staging_logs;

UPDATE songplays_staging
SET concat_id = song || artist || CAST(length AS text)
WHERE (song || artist || CAST(length AS text)) IS NOT NULL;

INSERT INTO songplays_staging1
SELECT 
    s.song_id, 
    s.title AS song, 
    a.artist_id, 
    a.name AS artist, 
    s.duration AS length,
    (SELECT song || artist || CAST(length AS text)) AS concat_id
FROM songs s 
JOIN artists a ON s.artist_id = a.artist_id;

UPDATE songplays_staging
SET song_id = st1.song_id,
    artist_id = st1.artist_id
FROM songplays_staging st
    JOIN songplays_staging1 st1
        ON st.concat_id = st1.concat_id;
        
INSERT INTO songplays 
SELECT CAST((ROW_NUMBER() OVER (ORDER BY st.ts) || st.ts) AS bigint) AS songplay_id, 
    st.start_time, 
    st.user_id, 
    st.level, 
    st.song_id, 
    st.artist_id, 
    st.session_id, 
    st.location, 
    st.user_agent
FROM (SELECT st.*,
             ROW_NUMBER() OVER (PARTITION BY ts, 
                                             user_id, 
                                             level, 
                                             song_id, 
                                             artist_id, 
                                             location, 
                                             song, 
                                             artist, 
                                             length,
                                             user_agent ORDER BY ts, 
                                                             user_id, 
                                                             level, 
                                                             song_id, 
                                                             artist_id, 
                                                             location, 
                                                             song, 
                                                             artist, 
                                                             length,
                                                             user_agent DESC) seqnum
      FROM songplays_staging st) st
LEFT JOIN songplays s
ON st.songplay_id = s.songplay_id
WHERE seqnum = 1 AND s.songplay_id IS NULL;

DROP TABLE IF EXISTS songplays_staging;
DROP TABLE IF EXISTS songplays_staging1;
""")
conn.commit()

CPU times: user 653 µs, sys: 100 µs, total: 753 µs
Wall time: 6.41 s


In [41]:


%%time
#Load Songplays Table
cur.execute("""
DROP TABLE IF EXISTS songplays_staging;
DROP TABLE IF EXISTS songplays_staging1;
DROP TABLE artists_staging;
""")
conn.commit()

CPU times: user 1.65 ms, sys: 0 ns, total: 1.65 ms
Wall time: 170 ms


In [6]:
conn.commit()

In [12]:
%%time
cur.execute("SELECT * FROM artists;")
colnames = [desc[0] for desc in cur.description]
colnames
    
# extract data from staging_logs file
cur.execute("SELECT * FROM artists;")
data=cur.fetchall()
    
# create dataframe
df_test = pd.DataFrame(data,columns=colnames) 

CPU times: user 31.9 ms, sys: 7.73 ms, total: 39.6 ms
Wall time: 3.79 s


In [13]:
df_test

Unnamed: 0,artist_id,name,location,latitude,longitude
0,AR00B1I1187FB433EB,Eagle-Eye Cherry,"Stockholm, Sweden",,
1,AR00DG71187B9B7FCB,Basslovers United,,,
2,AR00FVC1187FB5BE3E,Panda,"Monterrey, NL, México",25.67084,-100.30953
3,AR00JIO1187B9A5A15,Saigon,Brooklyn,40.65507,-73.94888
4,AR00LNI1187FB444A5,Bruce BecVar,,,
5,AR00MQ31187B9ACD8F,Chris Carrier,,,
6,AR00TGQ1187B994F29,Paula Toller,,,
7,AR00Y9I1187B999412,Akercocke,,,
8,AR00YYQ1187FB504DC,God Is My Co-Pilot,"New York, NY",40.71455,-74.00712
9,AR016P51187B98E398,Indian Ropeman,,,


In [9]:
conn.commit()