In [19]:
import pandas as pd
import boto3
import json
import configparser
import psycopg2

config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [2]:
%load_ext sql

### Using `configparser` to load all our varibales 

In [3]:
HOST=config.get('CLUSTER', 'HOST')
DB_NAME=config.get('CLUSTER', 'DB_NAME')
DB_USER=config.get('CLUSTER', 'DB_USER')
DB_PASSWORD=config.get('CLUSTER', 'DB_PASSWORD')
DB_PORT=config.get('CLUSTER', 'DB_PORT')
ARN = config.get('IAM_ROLE', 'ARN')
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

In [4]:
# Varify that we can extract all the configurations needed
print('{}{}{}{}'.format(*config['CLUSTER'].values()))

dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.comdwhdwhuserPassw0rd


## Connect to our Redshift cluster

In [5]:
#conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
# cur = conn.cursor()
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, HOST, DB_PORT, DB_NAME)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.com:5439/dwh


## Create our `redshift` and `S3` clients using `boto3` AWS API

In [6]:
redshift = boto3.client('redshift',
                 aws_access_key_id=KEY,
                 aws_secret_access_key=SECRET,
                 region_name='us-west-2')

s3 = boto3.resource('s3',
                 aws_access_key_id=KEY,
                 aws_secret_access_key=SECRET,
                 region_name='us-west-2')

Check the cluster is available and running 

In [7]:
keys = ['ClusterIdentifier','NodeType','ClusterStatus','MasterUsername','Endpoint']
cluster_dict = {k:v for k,v in redshift.describe_clusters()['Clusters'][0].items() if k in keys}
cluster_dict

{'ClusterIdentifier': 'dwhcluster',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'available',
 'MasterUsername': 'dwhuser',
 'Endpoint': {'Address': 'dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.com',
  'Port': 5439}}

In [8]:
cluster_dentifier = cluster_dict['ClusterIdentifier']
# if the status is paused we can run the below to resume the cluster
redshift.resume_cluster(ClusterIdentifier=cluster_dentifier)
# if the status is avaible and we wish to Pause the cluster we can run the command below
# redshift.pause_cluster(ClusterIdentifier=cluster_dentifier)

### S3 Bucket and Folders for the JSON Data

In [9]:
log_data = config.get('S3', 'LOG_DATA')
log_json = config.get('S3', 'LOG_JSONPATH')
song_data = config.get('S3', 'SONG_DATA')

In [10]:
log_json

's3://udacity-dend/log_json_path.json'

In [11]:
sampleDbBucket =  s3.Bucket("udacity-dend")

Let's view the first 5 log files

In [12]:
for i, obj in enumerate(sampleDbBucket.objects.filter(Prefix='log_data')):
    print(obj.key)
    if i >= 5:
        break

log_data/
log_data/2018/11/2018-11-01-events.json
log_data/2018/11/2018-11-02-events.json
log_data/2018/11/2018-11-03-events.json
log_data/2018/11/2018-11-04-events.json
log_data/2018/11/2018-11-05-events.json


Let's view the first 5 metadata files

In [13]:
for i, obj in enumerate(sampleDbBucket.objects.filter(Prefix='song_data')):
    print(obj.key)
    if i >= 5:
        break

song_data/
song_data/A/A/A/TRAAAAK128F9318786.json
song_data/A/A/A/TRAAAAV128F421A322.json
song_data/A/A/A/TRAAABD128F429CF47.json
song_data/A/A/A/TRAAACN128F9355673.json
song_data/A/A/A/TRAAAEA128F935A30D.json


### Data Exploration using Pandas 
#### Read our song metadata files into a Pandas DataFrame

In [14]:
sampleDbBucket =  s3.Bucket("udacity-dend")
for i, obj in enumerate(sampleDbBucket.objects.filter(Prefix='log_data')):
    if i == 1:
        df = pd.read_json(obj.get()['Body'].read(), lines=True)
    elif i > 1:
        df = pd.concat([df, pd.read_json(obj.get()['Body'].read(), lines=True)])

In [36]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 8056 entries, 0 to 387
Data columns (total 18 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   artist         6820 non-null   object 
 1   auth           8056 non-null   object 
 2   firstName      7770 non-null   object 
 3   gender         7770 non-null   object 
 4   itemInSession  8056 non-null   int64  
 5   lastName       7770 non-null   object 
 6   length         6820 non-null   float64
 7   level          8056 non-null   object 
 8   location       7770 non-null   object 
 9   method         8056 non-null   object 
 10  page           8056 non-null   object 
 11  registration   7770 non-null   float64
 12  sessionId      8056 non-null   int64  
 13  song           6820 non-null   object 
 14  status         8056 non-null   int64  
 15  ts             8056 non-null   int64  
 16  userAgent      7770 non-null   object 
 17  userId         8056 non-null   object 
dtypes: float6

Examine our String fields, max length (character) values 

In [75]:
for i in df:
    if df[i].dtype == 'object':
        print(i, df[i].str.len().max())
    else:
        print(i, df[i].dtype)

artist 89.0
auth 10
firstName 10.0
gender 1.0
itemInSession int64
lastName 9.0
length float64
level 4
location 46.0
method 3
page 16
registration float64
sessionId int64
song 151.0
status int64
ts int64
userAgent 139.0
userId 3.0


#### Read our activity log files into a Pandas DataFrame (first `1000` files)

In [150]:
sampleDbBucket =  s3.Bucket("udacity-dend")
for i, obj in enumerate(sampleDbBucket.objects.filter(Prefix='song_data')):
    if i == 1:
        df_s = pd.read_json(obj.get()['Body'].read(), lines=True)
    elif i > 1:
        df_s = pd.concat([df_s, pd.read_json(obj.get()['Body'].read(), lines=True)])
    if i >= 1000:
        break

In [152]:
df_s.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000 entries, 0 to 0
Data columns (total 10 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   artist_id         1000 non-null   object 
 1   artist_latitude   357 non-null    float64
 2   artist_location   1000 non-null   object 
 3   artist_longitude  357 non-null    float64
 4   artist_name       1000 non-null   object 
 5   duration          1000 non-null   float64
 6   num_songs         1000 non-null   int64  
 7   song_id           1000 non-null   object 
 8   title             1000 non-null   object 
 9   year              1000 non-null   int64  
dtypes: float64(3), int64(2), object(5)
memory usage: 85.9+ KB


In [153]:
df_s.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARJNIUY12298900C91,,,,Adelitas Way,213.9424,1,SOBLFFE12AF72AA5BA,Scream,2009
0,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
0,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969
0,AR9Q9YC1187FB5609B,,New Jersey,,Quest_ Pup_ Kevo,252.94322,1,SOFRDWL12A58A7CEF7,Hit Da Scene,0
0,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001


In [154]:
for i in df_s:
    if df_s[i].dtype == 'object':
        print(i, df_s[i].str.len().max())
    else:
        print(i, df_s[i].dtype)

artist_id 18
artist_latitude float64
artist_location 44.0
artist_longitude float64
artist_name 72.0
duration float64
num_songs int64
song_id 18
title 86
year int64


In [211]:
df.columns

Index(['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
       'length', 'level', 'location', 'method', 'page', 'registration',
       'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId'],
      dtype='object')

## SQL to Create our Schema for Staging Area

In [15]:
%%sql
CREATE SCHEMA IF NOT EXISTS staging_area;
SET search_path TO staging_area;

DROP TABLE IF EXISTS staging_events_table;
DROP TABLE IF EXISTS staging_songs_table;


CREATE TABLE staging_events_table
(
    artist VARCHAR(250),
    auth VARCHAR(10),
    firstName VARCHAR(50),
    gender VARCHAR(1),
    itemInSession SMALLINT,
    lastName VARCHAR(50),
    length DECIMAL(9,5),
    level VARCHAR(4),
    location VARCHAR(50),
    method VARCHAR(3),
    page VARCHAR(16),
    registration DOUBLE PRECISION,
    sessionId SMALLINT,
    song VARCHAR(250),
    status SMALLINT,
    ts BIGINT, 
    userAgent VARCHAR(150),
    userId VARCHAR(3)
);


CREATE TABLE staging_songs_table
(
    artist_id VARCHAR(250),
    artist_latitude DECIMAL(9,6),
    artist_location VARCHAR(250),
    artist_longitude DECIMAL(9,6),
    artist_name VARCHAR(250),
    duration DECIMAL(9,5),
    num_songs SMALLINT,
    song_id VARCHAR(20),
    title VARCHAR(250),
    year SMALLINT
);


 * postgresql://dwhuser:***@dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.


[]

### Extract (COPY) our Metadata files to a Staging Table

In [16]:
%%sql
copy staging_songs_table 
from 's3://udacity-dend/song_data/'
credentials 'aws_iam_role=arn:aws:iam::104737435278:role/dwhRole'
region 'us-west-2'
json 'auto ignorecase';

 * postgresql://dwhuser:***@dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

### Extract (COPY) our Log files to a Staging Table

In [18]:
%%sql
copy staging_events_table 
from 's3://udacity-dend/log_data/'
credentials 'aws_iam_role=arn:aws:iam::104737435278:role/dwhRole'
region 'us-west-2'
json 's3://udacity-dend/log_json_path.json';

 * postgresql://dwhuser:***@dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

## Data Warehouse Tables (Fact and Dimensions)

In [356]:
%%sql

CREATE SCHEMA IF NOT EXISTS sparkify;
SET search_path TO sparkify;

DROP TABLE IF EXISTS fact_songplays;
DROP TABLE IF EXISTS dim_users; 
DROP TABLE IF EXISTS dim_songs;
DROP TABLE IF EXISTS dim_artist;
DROP TABLE IF EXISTS dim_time;


CREATE TABLE dim_users (
    user_id smallint,
    first_name varchar(50) NOT NULL,
    last_name varchar(50) NOT NULL,
    gender varchar(1),
    level varchar(4) NOT NULL,
    PRIMARY KEY(user_id)
)
DISTSTYLE all
SORTKEY(user_id);


CREATE TABLE dim_artist (
    artist_id varchar(250),
    name varchar(250) NOT NULL,
    location varchar(250), 
    latitude DECIMAL(9,6),
    longitude DECIMAL(9,6),
    PRIMARY KEY(artist_id)
)
DISTSTYLE KEY
DISTKEY(artist_id)
SORTKEY(artist_id);

CREATE TABLE dim_songs (
    song_id varchar(20),
    title varchar(250) NOT NULL,
    artist_id varchar(250)  NOT NULL,
    year smallint,
    duration DECIMAL(9,5),
    PRIMARY KEY(song_id),
    FOREIGN KEY(artist_id) references dim_artist(artist_id)
)
DISTSTYLE KEY
DISTKEY(song_id)
SORTKEY(song_id, artist_id)
;

CREATE TABLE dim_time(
    start_time timestamp,
    hour smallint,
    day smallint,
    week smallint,
    month smallint, 
    year smallint, 
    weekday smallint,
    PRIMARY KEY(start_time)
)
DISTSTYLE KEY
DISTKEY(start_time)
SORTKEY(start_time);


CREATE TABLE fact_songplays (
    songplay_id bigint IDENTITY(1,1),
    start_time timestamp NOT NULL,
    user_id smallint NOT NULL,
    level varchar(4),
    song_id varchar(20),
    artist_id varchar(250),
    session_id SMALLINT NOT NULL,
    location varchar(50),
    user_agent varchar(150),
    PRIMARY KEY(songplay_id),
    FOREIGN KEY(user_id) references dim_users(user_id),
    FOREIGN KEY(song_id) references dim_songs(song_id),
    FOREIGN KEY(artist_id) references dim_artist(artist_id),
    FOREIGN KEY(start_time) references dim_time(start_time)
)
DISTSTYLE KEY
DISTKEY(start_time)
SORTKEY(start_time, user_id, song_id, artist_id)
;

 * postgresql://dwhuser:***@dwhcluster.cofikax0oj2p.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]