In [None]:
import configparser
import pandas as pd

In [None]:
%load_ext sql

#### Connect to Redshift

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

DWH_DB = config.get('DWH', 'DWH_DB')
DWH_DB_USER = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH', 'DWH_PORT')
DWH_ROLE_ARN = config.get('DWH', 'DWH_ROLE_ARN')
DWH_ENDPOINT = config.get('DWH', 'DWH_ENDPOINT')

In [None]:
#postgresql://username:password@host:port/database
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)

%sql $conn_string

#### Connect to S3 bucket and view files using prefix 

In [None]:
s3 = boto3.resource('s3',
                   region_name='us-east-2', # replace with your cluster region
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET)

DbBucket = s3.Bucket("dwh-training")

#view data files
for song_obj in DbBucket.objects.filter(Prefix="data/song_data/A/"):
    print(song_obj)

for log_obj in DbBucket.objects.filter(Prefix="data/log_data/2018/11/"):
    print(log_obj)

#### Create staging tables

In [None]:
%%sql

DROP TABLE IF EXISTS staging_event;
DROP TABLE IF EXISTS staging_song;

SET search_path TO public;

CREATE TABLE IF NOT EXISTS staging_event(
    artist VARCHAR,
    auth VARCHAR,
    firstName VARCHAR,
    gender TEXT,
    itemInSession INTEGER,
    lastName VARCHAR,
    length FLOAT,
    level TEXT,
    location VARCHAR,
    method TEXT,
    page TEXT,
    registration FLOAT,
    sessionId INTEGER,
    song VARCHAR,
    status INTEGER,
    ts BIGINT,
    userAgent VARCHAR,
    userId VARCHAR
    );

CREATE TABLE IF NOT EXISTS staging_song(
    num_songs INTEGER,
    artist_id VARCHAR(100),
    artist_latitude FLOAT,
    artist_longitude FLOAT,
    artist_location VARCHAR(100),
    artist_name VARCHAR(100),
    song_id VARCHAR(50),
    title VARCHAR(100),
    duration FLOAT,
    year INTEGER
    );

#### Insert into staging tables using the COPY command

In [None]:
copy_staging_event = ("""COPY staging_event FROM 's3://dwh-training/data/log_data/2018/11/'
    credentials 'aws_iam_role={}' region 'us-east-2' 
    JSON 's3://dwh-training/data/log_jsonpath.json';""").format(DWH_ROLE_ARN)

copy_staging_song = ("""COPY staging_song FROM 's3://dwh-training/data/song_data/A/'
    credentials 'aws_iam_role={}' region 'us-east-2' JSON 'auto';""").format(DWH_ROLE_ARN) 

%sql $copy_staging_event
%sql $copy_staging_song

#### Create the Fact and Dimension tables

In [None]:
%%sql

SET search_path TO project;

DROP TABLE IF EXISTS songplays;
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;
DROP TABLE IF EXISTS time;

CREATE TABLE IF NOT EXISTS songplays (
    songplay_id INTEGER IDENTITY(1, 1) PRIMARY KEY,
    start_time bigint NOT NULL REFERENCES time(start_time) sortkey,
    user_id VARCHAR NOT NULL REFERENCES users(user_id),
    level TEXT,
    song_id varchar(50) NOT NULL REFERENCES songs(song_id) distkey,
    artist_id varchar(100) NOT NULL REFERENCES artists(artist_id),
    session_id INTEGER NOT NULL,
    location varchar(200),
    user_agent varchar(225));

CREATE TABLE IF NOT EXISTS users (
    user_id varchar(50) PRIMARY KEY,
    first_name varchar(50),
    last_name varchar(50),
    gender TEXT,
    level TEXT)
    DISTSTYLE all;

CREATE TABLE IF NOT EXISTS songs (
    song_id varchar(50) PRIMARY KEY,
    title varchar(100),
    artist_id varchar(100) REFERENCES artists(artist_id) sortkey distkey,
    year INTEGER,
    duration FLOAT);

CREATE TABLE IF NOT EXISTS artists (
    artist_id varchar(100) PRIMARY KEY,
    name varchar(100),
    location varchar(100),
    latitude float,
    longitude float)
    DISTSTYLE all;

CREATE TABLE IF NOT EXISTS time (
    start_time bigint PRIMARY KEY,
    hour INTEGER,
    day INTEGER,
    week INTEGER,
    month INTEGER,
    year INTEGER,
    weekday INTEGER)
    DISTSTYLE all;

#### Insert into fact and dimension tables

In [None]:
%%sql

INSERT INTO users (
    user_id,
    first_name,
    last_name,
    gender,
    level) 
SELECT userId AS user_id, firstName AS first_name,
lastName AS last_name, gender, level
FROM public.staging_event
WHERE userId IS NOT NULL
ORDER BY user_id;


INSERT INTO songs(
    song_id,
    title,
    artist_id,
    year,
    duration) 
SELECT song_id, title, artist_id, year, duration
FROM public.staging_song
ORDER BY song_id;


INSERT INTO artists (
    artist_id,
    name,
    location,
    latitude,
    longitude)
SELECT artist_id, artist_name, artist_location AS location,
artist_latitude AS latitude, artist_longitude AS longitude
FROM public.staging_song
WHERE artist_name IS NOT NULL
ORDER BY artist_id;


INSERT INTO time (
    start_time,
    hour,
    day,
    week,
    month,
    year,
    weekday)
SELECT start_time,
    EXTRACT(hour FROM date_time) AS hour,
    EXTRACT(day FROM date_time) AS day,
    EXTRACT(week FROM date_time) AS week,
    EXTRACT(month FROM date_time) AS month,
    EXTRACT(year FROM date_time) AS year,
    EXTRACT(weekday FROM date_time) AS weekday
FROM (SELECT ts AS start_time,
    CAST('1900-01-01' AS DATE) + ts/1000 * interval '1 sec' AS date_time
    FROM public.staging_event)
ORDER BY start_time;


INSERT INTO songplays (
    start_time,
    user_id,
    level,
    song_id,
    artist_id,
    session_id,
    location,
    user_agent) 
SELECT SE.ts AS start_time,
SE.userId AS user_id,
SE.level,
SS.song_id,
SS.artist_id,
SE.sessionId AS session_id,
SE.location,
SE.userAgent AS user_agent
FROM public.staging_event SE
JOIN public.staging_song SS ON SE.song = SS.title
AND SE.artist = SS.artist_name
LEFT OUTER JOIN songplays ON SE.userId = songplays.user_id
AND SE.ts = songplays.start_time
WHERE SE.page = 'NextSong'
AND SE.userId IS NOT NULL
AND SE.level IS NOT NULL
AND SS.song_id IS NOT NULL
AND SS.artist_id IS NOT NULL
AND SE.sessionId IS NOT NULL
AND SE.location IS NOT NULL
AND SE.userAgent IS NOT NULL
ORDER BY start_time, user_id;