In [1]:
import psycopg2
import configparser
from sql_queries import *

In [2]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

print("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))

host=siweiredshift.cgrmsb95kk9b.us-west-2.redshift.amazonaws.com dbname=dev user=siwei password=Passw0rd port=5439


In [3]:
DWH_ROLE_ARN = config.get('IAM_ROLE', 'ARN')
LOG_DATA = config.get('S3', 'LOG_DATA')
LOG_JSONPATH = config.get('S3', 'LOG_JSONPATH')
SONG_DATA = config.get('S3', 'SONG_DATA')
HOST = config.get('CLUSTER', 'HOST')
staging_songs_table_drop = ("""
DROP TABLE IF EXISTS staging_songs_table
""")
staging_songs_table_create = ("""
CREATE TABLE IF NOT EXISTS staging_songs_table
(num_songs INTEGER,
artist_id VARCHAR NOT NULL,
artist_lattitude DOUBLE PRECISION,
artist_longitude DOUBLE PRECISION,
artist_location VARCHAR,
artist_name VARCHAR,
song_id VARCHAR PRIMARY KEY,
title VARCHAR NOT NULL,
duration DECIMAL NOT NULL,
year SMALLINT
)
""")
staging_songs_copy = ("""
copy staging_songs_table from {}
iam_role  {}
json 'auto'
""").format(SONG_DATA, DWH_ROLE_ARN)

### Set up connection to Redshift Cluster

In [4]:

aws_db_paremeters = ("""
host = {}
dbname = dev
user = siwei
password = Passw0rd
port = 5439
""").format(HOST)

### Copy stating_songs_table rom S3

In [31]:
cur = conn.cursor()
try:
    cur.execute(staging_songs_table_drop)
    cur.execute(staging_songs_table_create)
    cur.execute(staging_songs_copy)
    conn.commit()
except Exception as e:
    print(e)
# row = cur.fetchone()
# print(row)
conn.close()

### Drop, Create and Insert data into time_table

In [46]:
time_table_drop = "DROP TABLE IF EXISTS time_table"

time_table_create = ("""
CREATE TABLE IF NOT EXISTS time_table
(start_time_stamp INT8 PRIMARY KEY,
start_time TIMESTAMP,
hour SMALLINT,
day SMALLINT,
week SMALLINT,
month SMALLINT,
year SMALLINT,
weekday SMALLINT
)
""")

time_table_insert = ("""
INSERT INTO time_table
(start_time_stamp, start_time, hour, day, week, month, year, weekday)
(SELECT
ts,
TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second' as start_time,
extract (hour from start_time),
extract (day from start_time),
extract (week from start_time),
extract (month from start_time),
extract (year from start_time),
extract (dow from start_time)
FROM staging_events_table
WHERE page = 'NextSong')
""")

In [47]:
conn = psycopg2.connect(aws_db_paremeters)
cur = conn.cursor()
try:
    cur.execute(time_table_drop)
    cur.execute(time_table_create)
    cur.execute(time_table_insert)
    conn.commit()
except Exception as e:
    print(e)
# row = cur.fetchone()
# print(row)
conn.close()

### Drop, Create and Insert songplay_table

In [59]:
songplay_table_insert = ("""
INSERT INTO songplay_table
(start_time_stamp, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT 
t.start_time,
se.userId,
se.level,
ss.song_id,
ss.artist_id,
se.sessionId,
se.location,
se.userAgent
FROM staging_events_table se
JOIN time_table t
ON se.ts = t.start_time_stamp
JOIN staging_songs_table ss
ON (se.song = ss.title) AND (se.artist = ss.artist_name)
""")

songplay_table_create = ("""
CREATE TABLE IF NOT EXISTS songplay_table
(songplay_id INT IDENTITY(0, 1) PRIMARY KEY,
start_time_stamp TIMESTAMP NOT NULL,
user_id INTEGER NOT NULL,
level VARCHAR,
song_id VARCHAR,
artist_id VARCHAR,
session_id INTEGER,
location VARCHAR,
user_agent VARCHAR
)
""")

In [60]:
conn = psycopg2.connect(aws_db_paremeters)
cur = conn.cursor()
try:
    cur.execute(songplay_table_drop)
    cur.execute(songplay_table_create)
    cur.execute(songplay_table_insert)
    conn.commit()
except Exception as e:
    print(e)
conn.close()

### Drop, Create and Insert user_table

In [7]:
user_table_create = ("""
CREATE TABLE IF NOT EXISTS user_table
(user_id INTEGER PRIMARY KEY,
first_name VARCHAR,
last_name VARCHAR,
gender CHAR(1),
level CHAR(10)
)
""")
user_table_insert = ("""
INSERT INTO user_table
(user_id, first_name, last_name, gender, level)
SELECT userId, firstName, lastName, gender, level
FROM staging_events_table
WHERE userId is not null;
""")
user_table_distinct = ("""
BEGIN;
CREATE TABLE temp1 AS SELECT DISTINCT * FROM user_table;
ALTER TABLE user_table RENAME TO temp2;
ALTER TABLE temp1 RENAME TO user_table;
DROP TABLE temp2;
COMMIT;
""")

In [8]:
conn = psycopg2.connect(aws_db_paremeters)
cur = conn.cursor()
try:
    cur.execute(user_table_drop)
    cur.execute(user_table_create)
    cur.execute(user_table_insert)
    cur.execute(user_table_distinct)
    conn.commit()
except Exception as e:
    print(e)
conn.close()

### Drop, Create and Insert song_table

In [5]:
song_table_drop = "DROP TABLE IF EXISTS song_table"
song_table_create = ("""
CREATE TABLE IF NOT EXISTS song_table
(song_id VARCHAR PRIMARY KEY,
title VARCHAR NOT NULL,
artist_id VARCHAR,
year INTEGER,
duration DECIMAL NOT NULL
)
""")
song_table_insert = ("""
INSERT INTO song_table
(song_id, title, artist_id, year, duration)
SELECT song_id, title, artist_id, year, duration
FROM staging_songs_table
""")


In [6]:
conn = psycopg2.connect(aws_db_paremeters)
cur = conn.cursor()
try:
    cur.execute(song_table_drop)
    cur.execute(song_table_create)
    cur.execute(song_table_insert)
    conn.commit()
except Exception as e:
    print(e)
conn.close()

### Drop, Create and Insert artist_table

In [10]:
artist_table_drop = "DROP TABLE IF EXISTS artist_table"

artist_table_create = ("""
CREATE TABLE IF NOT EXISTS artist_table
(artist_id VARCHAR PRIMARY KEY,
name VARCHAR NOT NULL,
location VARCHAR,
lattitude double precision,
longitude double precision
)
""")

artist_table_insert = ("""
INSERT INTO artist_table
(artist_id, name, location, lattitude, longitude)
SELECT artist_id, artist_name, artist_location, artist_lattitude, artist_longitude
FROM staging_songs_table
""")

artist_table_distinct = (
"""
BEGIN;
CREATE TABLE temp1 AS SELECT DISTINCT * FROM artist_table;
ALTER TABLE artist_table RENAME TO temp2;
ALTER TABLE temp1 RENAME TO artist_table;
DROP TABLE temp2;
COMMIT;
"""
)



In [11]:
conn = psycopg2.connect(aws_db_paremeters)
cur = conn.cursor()
try:
    cur.execute(artist_table_drop)
    cur.execute(artist_table_create)
    cur.execute(artist_table_insert)
    cur.execute(artist_table_distinct)
    conn.commit()
except Exception as e:
    print(e)
conn.close()