In [None]:
# install python s3fs package to read s3 bucket as filesystem
!pip install s3fs

In [1]:
import pandas as pd
import boto3
import json
import s3fs

# 1. Load DWH Params from a file

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


# 2. Create clients for EC2, S3, IAM, and Redshift

In [3]:
ec2 = boto3.resource('ec2',
                    region_name='us-west-2',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

s3 = boto3.resource('s3',
                    region_name='us-west-2',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

iam = boto3.client('iam',
                    region_name='us-west-2',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

redshift = boto3.client('redshift',
                    region_name='us-west-2',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

# 3. Explore Sparkify data sources on S3

In [4]:
# load S3 path for log, songs

LOG_DATA=config.get('S3','LOG_DATA')
LOG_JSONPATH=config.get('S3','LOG_JSONPATH')
SONG_DATA=config.get('S3','SONG_DATA')

In [5]:
LOG_DATA

"'s3://udacity-dend/log-data'"

In [6]:
# connect to Sparkify S3 bucket
sparkifyDbBucket =  s3.Bucket("udacity-dend")

In [7]:
# get the list of log data file from S3
log_data_files = [filename.key for filename in sparkifyDbBucket.objects.filter(Prefix='log-data')]
log_data_files[1:10]

['log-data/2018/11/2018-11-01-events.json',
 'log-data/2018/11/2018-11-02-events.json',
 'log-data/2018/11/2018-11-03-events.json',
 'log-data/2018/11/2018-11-04-events.json',
 'log-data/2018/11/2018-11-05-events.json',
 'log-data/2018/11/2018-11-06-events.json',
 'log-data/2018/11/2018-11-07-events.json',
 'log-data/2018/11/2018-11-08-events.json',
 'log-data/2018/11/2018-11-09-events.json']

In [8]:
# read one event-log file from S3 bucket

df_log = pd.read_json('s3://udacity-dend/{}'.format(log_data_files[1]), lines=True)

In [9]:
# check columns name to create Field for staging tables
df_log.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 18 columns):
artist           11 non-null object
auth             15 non-null object
firstName        15 non-null object
gender           15 non-null object
itemInSession    15 non-null int64
lastName         15 non-null object
length           11 non-null float64
level            15 non-null object
location         15 non-null object
method           15 non-null object
page             15 non-null object
registration     15 non-null int64
sessionId        15 non-null int64
song             11 non-null object
status           15 non-null int64
ts               15 non-null int64
userAgent        15 non-null object
userId           15 non-null int64
dtypes: float64(1), int64(6), object(11)
memory usage: 2.2+ KB


In [18]:
SONG_DATA

"'s3://udacity-dend/song-data'"

In [10]:
# get the list of song data file from S3 - File is big so be patient
song_data_files = [filename.key for filename in sparkifyDbBucket.objects.filter(Prefix='song-data')]

In [11]:
# check number of song to load to staging tables
len(song_data_files)

385253

In [12]:
# check song file list
song_data_files[0:10]

['song-data/',
 'song-data/A/A/A/TRAAAAK128F9318786.json',
 'song-data/A/A/A/TRAAAAV128F421A322.json',
 'song-data/A/A/A/TRAAABD128F429CF47.json',
 'song-data/A/A/A/TRAAACN128F9355673.json',
 'song-data/A/A/A/TRAAAEA128F935A30D.json',
 'song-data/A/A/A/TRAAAED128E0783FAB.json',
 'song-data/A/A/A/TRAAAEM128F93347B9.json',
 'song-data/A/A/A/TRAAAEW128F42930C0.json',
 'song-data/A/A/A/TRAAAFD128F92F423A.json']

In [16]:
# read one son-data file from S3 bucket

df_song = pd.read_json('s3://udacity-dend/{}'.format(song_data_files[1]), lines=True)

In [17]:
# check song-data columns name for staging table fields
df_song.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 10 columns):
artist_id           1 non-null object
artist_latitude     0 non-null float64
artist_location     1 non-null object
artist_longitude    0 non-null float64
artist_name         1 non-null object
duration            1 non-null float64
num_songs           1 non-null int64
song_id             1 non-null object
title               1 non-null object
year                1 non-null int64
dtypes: float64(3), int64(2), object(5)
memory usage: 160.0+ bytes


In [15]:
df_song.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540344794796,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [19]:
# read S3 bucket - log_json_path - define json file mapping to read from S3 for loading staging tables

df = pd.read_json('s3://udacity-dend/log_json_path.json')

In [20]:
df.jsonpaths

0            $['artist']
1              $['auth']
2         $['firstName']
3            $['gender']
4     $['itemInSession']
5          $['lastName']
6            $['length']
7             $['level']
8          $['location']
9            $['method']
10             $['page']
11     $['registration']
12        $['sessionId']
13             $['song']
14           $['status']
15               $['ts']
16        $['userAgent']
17           $['userId']
Name: jsonpaths, dtype: object

# 4. Configure AWS Redshit Cluster - Infrastructure as Code

## 4.1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [21]:
# Create the IAM role
try:
    print('4.1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
                                Path = '/',
                                RoleName = DWH_IAM_ROLE_NAME,
                                Description = 'Allows Redshift clusters to call AWS services on your behalf.',
                                AssumeRolePolicyDocument = json.dumps(
                                                                    {
                                                                        'Statement' : [{
                                                                                        'Action' : 'sts:AssumeRole',
                                                                                        'Effect' : 'Allow',
                                                                                        'Principal' : {'Service' : 'redshift.amazonaws.com'}
                                                                                        }],
                                                                        'Version' : '2012-10-17'
                                                                    }
                                                                    )

                            )
    

except Exception as e:
    print(e)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.


In [22]:
# Attach Policy
print('4.1.2 Attaching Policy')
iam.attach_role_policy(RoleName = DWH_IAM_ROLE_NAME,
                        PolicyArn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess',
                        )['ResponseMetadata']['HTTPStatusCode']

1.2 Attaching Policy


200

In [23]:
# Get and print the IAM role ARN
print('4.1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.3 Get the IAM role ARN
arn:aws:iam::964592634597:role/dwhRole


## 4.2 Redshift Cluster

- Create a RedShift Cluster

In [24]:
# Create a RedShift Cluster
try:
    response = redshift.create_cluster(        
        # add parameters for hardware
        ClusterType = DWH_CLUSTER_TYPE,
        NodeType = DWH_NODE_TYPE,
        NumberOfNodes = int(DWH_NUM_NODES),

        # add parameters for identifiers & credentials
        DBName = DWH_DB,
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
        MasterUsername = DWH_DB_USER,
        MasterUserPassword = DWH_DB_PASSWORD,
        
        # add parameter for role (to allow s3 access)
        IamRoles = [roleArn]
    )
except Exception as e:
    print(e)

### 4.2.1 *Check* the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [29]:
#


def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-a8e792d0
7,NumberOfNodes,4


<h3> 4.2.2 Set the cluster <font color='red'> endpoint and role ARN </font> </h3>

In [30]:
# DWH_ENDPOINT must be set in dwh.cfg in [CLUSTER]HOST=
# DWH_ROLE_ARN must be set in dwh.cfg in [IAM_ROLE]ARN=
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT )
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::964592634597:role/dwhRole


<h3> to run correctly the **create_tables.py** and **etl.py** scripts <font color='red'> endpoint and role ARN </font> </h3>
Set DWH_ENDPOINT and DWH_ROLE_ARN for [CLUSTER] in dwh.cfg

## 4.3 Open an incoming  TCP port to access the cluster ednpoint

In [31]:
vpc = ec2.Vpc(id=myClusterProps['VpcId'])
defaultSg = list(vpc.security_groups.all())[0]
print(defaultSg)

ec2.SecurityGroup(id='sg-7accc02a')


In [32]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName= defaultSg.group_name,  # TODO: fill out
        CidrIp='0.0.0.0/0',  # TODO: fill out
        IpProtocol='TCP',  # TODO: fill out
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-7accc02a')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## 4.4 Check connection to the cluster

In [33]:
%load_ext sql

In [34]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

# 5. Sparkify ETL Process

## 5.1 Create  Staging, Facts and Dimension Tables in Redshift DWH

Run python script to create  Staging, Facts and Dimension Tables in Redshift DWH or open a Shell Terminal to run the following script

In [None]:
# run python script to create  Staging, Facts and Dimension Tables in Redshift DWH
# or open a Shell Terminal to run the following script
!./create_tables.py

## 5.2 Run ETL pipelines to Extract raw data to staging tables, Transform and Load data to Facts & Dimension tables

Run python script to process ETL pipleines to Extract raw data to staging tables, Transform and Load data to Facts & Dimension tables  
or open a Shell Terminal to run the following script  
ETL process is lengthy so please be patient -> **up or more than 1 hour to complete** to load the full song data to staging table.  

For checking development/checking purpose, we can reduce the number song data to load to staging table by using only a subset of song data :  
. full song data **s3://udacity-dend/song_data** (use **SONG_DATA** in dwh.cg)    
. subset of song data **s3://udacity-dend/song_data/A/A** (use **SONG_DATA_SUBSET** in dwh.cg)   

see question on Knowledge center [COPY Command runtime over 2 hours](https://knowledge.udacity.com/questions/113908)

In [None]:
# Query time on Redshift for ETL Process on full Song data > 1 hour
# For developement purpose, use SONG_DATA_SUBSET instead of SONG_DATA in sql_queries.py
# before running etl.py
!./etl.py

# 6. Check that DWH tables were populated correctly by ETL process

## 6.1 Check Staging Tables

count staging events records

In [35]:
%%sql
stag_event << select count(*) as stag_event from staging_events

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable stag_event


count staging records

In [36]:
%%sql
stag_song << select count(*) as stag_song from staging_songs

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable stag_song


In [37]:
# print count values for all  tables
print("Count Staging User = {}".format(stag_event))
print("Count Staging Song = {}".format(stag_song))

Count Staging User = +------------+
| stag_event |
+------------+
|    8056    |
+------------+
Count Staging Song = +-----------+
| stag_song |
+-----------+
|   385252  |
+-----------+


## 6.2 Check Fact and Dimension Tables

count fact_songplay records

In [39]:
%%sql
count_songplay << select count(*) as total_songplay from fact_songplay

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable count_songplay


count dim_users records

In [41]:
%%sql
count_user << select count(*) as total_user from dim_user

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable count_user


count dim_song records

In [42]:
%%sql
count_song << select count(*) as total_song from dim_song

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable count_song


count dim_artist records

In [43]:
%%sql
count_artist << select count(*) as total_artist from dim_artist

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable count_artist


count dim_time records

In [44]:
%%sql
count_time << select count(*) as total_time from dim_time

 * postgresql://dwhuser:***@dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.
Returning data to local variable count_time


print count values for all  tables

In [47]:
print("Count Songplay = {}".format(count_songplay))
print("Count User = {}".format(count_user))
print("Count Song = {}".format(count_song))
print("Count Artist = {}".format(count_artist))
print("Count Time = {}".format(count_time))

Count Songplay = +----------------+
| total_songplay |
+----------------+
|      6929      |
+----------------+
Count User = +------------+
| total_user |
+------------+
|    105     |
+------------+
Count Song = +------------+
| total_song |
+------------+
|   384995   |
+------------+
Count Artist = +--------------+
| total_artist |
+--------------+
|    45266     |
+--------------+
Count Time = +------------+
| total_time |
+------------+
|    8023    |
+------------+


# 7. Clean AWS Redshift Cluster

## 7.1 Clean Redshift Cluster - DO NOT RUN THIS UNLESS YOU ARE SURE 

In [48]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.cnl5ezt8wl23.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 6, 7, 14, 42, 37, 811000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-7accc02a',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-a8e792d0',
  'AvailabilityZone': 'us-west-2b',
  'PreferredMaintenanceWindow': 'sat:12:30-sat:13:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessibl

run this block several times until the cluster really deleted

In [52]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

## 7.2 Clean AWS IAM role/policy

In [53]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)

{'ResponseMetadata': {'RequestId': '354a79dc-6ba5-4638-a747-0612fe674cf2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '354a79dc-6ba5-4638-a747-0612fe674cf2',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Sun, 07 Jun 2020 16:44:02 GMT'},
  'RetryAttempts': 0}}