# Designing ETL Process for Sparkify AWS Data Warehouse

I use this notebook for experimenting while designing the ETL Process for the Sparkify AWS Data Warehouse.

In [12]:
import pandas as pd
import boto3
import json
import psycopg2

from sql_queries import *

In [13]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

CLUSTER_IDENTIFIER     = config.get("DWH","CLUSTER_IDENTIFIER")
CLUSTER_TYPE           = config.get("DWH","CLUSTER_TYPE")
NUM_NODES              = config.get("DWH","NUM_NODES")
NODE_TYPE              = config.get("DWH","NODE_TYPE")

HOST                   = config.get("CLUSTER","HOST")
DB_NAME                = config.get("CLUSTER","DB_NAME")
DB_USER                = config.get("CLUSTER","DB_USER")
DB_PASSWORD            = config.get("CLUSTER","DB_PASSWORD")
DB_PORT                = config.get("CLUSTER","DB_PORT")

IAM_ROLE_NAME          = config.get("IAM_ROLE", "IAM_ROLE_NAME")

LOG_DATA               = config.get("S3", "LOG_DATA")
LOG_JSONPATH           = config.get("S3", "LOG_JSONPATH")
SONG_DATA              = config.get("S3", "SONG_DATA")

pd.DataFrame({"Param":
                  ["CLUSTER_TYPE", "NUM_NODES", "NODE_TYPE", "CLUSTER_IDENTIFIER", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "IAM_ROLE_NAME", "LOG_DATA", "LOG_JSONPATH", "SONG_DATA"],
              "Value":
                  [CLUSTER_TYPE, NUM_NODES, NODE_TYPE, CLUSTER_IDENTIFIER, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, IAM_ROLE_NAME, LOG_DATA, LOG_JSONPATH, SONG_DATA]
             })

Unnamed: 0,Param,Value
0,CLUSTER_TYPE,multi-node
1,NUM_NODES,4
2,NODE_TYPE,dc2.large
3,CLUSTER_IDENTIFIER,sparkifydwh
4,DB_NAME,spqrkify_dwh
5,DB_USER,sparkify_dwh_user
6,DB_PASSWORD,Passw0rd
7,DB_PORT,5439
8,IAM_ROLE_NAME,redshift_role
9,LOG_DATA,'s3://udacity-dend/log_data'


## DB connection

In [14]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()
conn.set_session(autocommit=True)

## Create tables

In [15]:
# Staging Tables
cur.execute(staging_events_table_create)
cur.execute(staging_songs_table_create)

In [16]:
# Analytics Tables
cur.execute(songplay_table_create)
cur.execute(user_table_create)
cur.execute(song_table_create)
cur.execute(artist_table_create)
cur.execute(time_table_create)

## Insert into staging tables

In [17]:
cur.execute(staging_events_copy)

In [18]:
cur.execute(staging_songs_copy)

## Insert into analytics tables

In [19]:
cur.execute(user_table_insert)

In [20]:
cur.execute(song_table_insert)

In [21]:
cur.execute(artist_table_insert)

In [22]:
cur.execute(time_table_insert)

In [23]:
cur.execute(songplay_table_insert)

# Cleanup

In [9]:
# Drop Staging Tables
cur.execute(staging_events_table_drop)
cur.execute(staging_songs_table_drop)

In [10]:
# Drop Analytics Tables
cur.execute(songplay_table_drop)
cur.execute(user_table_drop)
cur.execute(song_table_drop)
cur.execute(artist_table_drop)
cur.execute(time_table_drop)

In [11]:
conn.close()