# Creating Redshift Cluster using the AWS python SDK (Infrastructure-as-code)

In [13]:
import pandas as pd
import boto3
import json
import psycopg2

# Load DWH Params from a file

In [2]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


## Create clients and resources for IAM, EC2, S3, and Redshift

To interact with EC2 and S3, utilize `boto3.resource`; for IAM and Redshift, use `boto3.client`. If you require additional details on boto3, refer to the [boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html).

**Note**: We create clients and resources in the **us-west-2** region as the files are in us-west-2 on S3.

In [3]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

# Check if we can access buckets and files on S3

In [4]:
DbBucket =  s3.Bucket("udacity-dend")
for obj in DbBucket.objects.filter(Prefix="log_json_path"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_json_path.json')


# STEP 1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly) and updating the dwh.cfg file for running the sql queries

In [5]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::557690598420:role/dwhRole


# STEP 2:  Redshift Cluster

- Create a [RedShift Cluster](https://console.aws.amazon.com/redshiftv2/home)
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [7]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

## 2.1 *Describe* the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [6]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

  pd.set_option('display.max_colwidth', -1)


Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.ciiyz6v68qka.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-056acecd13129bb4c
7,NumberOfNodes,4


<h2> 2.2 Take note of the cluster <font color='red'> endpoint and role ARN </font> </h2>

<font color='red'>DO NOT RUN THIS unless the cluster status becomes "Available". Make ure you are checking your Amazon Redshift cluster in the **us-west-2** region. </font>


In [7]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.ciiyz6v68qka.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::557690598420:role/dwhRole


## 2.3 Update the dwh.cfg with the ENDPOINT value

## STEP 3: Open an incoming  TCP port to access the cluster ednpoint

In [13]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0378f21c4c2a4c9d0')


# STEP 4: Connect to the cluster

In [8]:
%load_ext sql

In [9]:


conn = psycopg2.connect(
    host=DWH_ENDPOINT,
    dbname=DWH_DB,
    user=DWH_DB_USER,
    password=DWH_DB_PASSWORD,
    port=DWH_PORT
)
cur = conn.cursor()


# STEP 5 : RUNNING THE SCRIPTS TO LOAD DATA

## STEP 5.1: Running the create_tables.py script

In [30]:
%load_ext autoreload
%autoreload 2
# these lines are to make sure we keep pulling the latest script, helps when trying to 
# debug the changes made to the script while testing

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [59]:
# import create_tables, sql_queries

%run create_tables.py

Running drop statement validation queries...
Validation passed: Table 'songs' dropped.
Validation passed: Table 'users' dropped.
Validation passed: Table 'artists' dropped.
Validation passed: Table 'time' dropped.
Validation passed: Table 'songplays' dropped.
Running create statement validation queries...
Validation passed: Table 'songs' exists.
Validation passed: Table 'users' exists.
Validation passed: Table 'artists' exists.
Validation passed: Table 'time' exists.
Validation passed: Table 'songplays' exists.


## STEP 5.2: Running the etl.py script

In [69]:
%run etl.py
# SELECT *FROM stl_load_errors; for checking the errors when querying on redshift

Truncating tables so there is no duplicates on rerun if the script fails in between
Loading data into staging tables...
Running staging query:  staging_events...
Staging query executed successfully.
Running staging query:  staging_songs ...
Staging query executed successfully.
Validating staging tables...
Validation passed: 'staging_events' loaded with 8056 records.
Validation passed: 'staging_songs' loaded with 14896 records.

Inserting data into final tables...
Running insert query: 
INSERT INTO songplays (start_...
Insert query executed successfully.
Running insert query: 
INSERT INTO users (user_id, f...
Insert query executed successfully.
Running insert query: 
INSERT INTO songs (song_id, t...
Insert query executed successfully.
Running insert query: 
INSERT INTO artists (artist_i...
Insert query executed successfully.
Running insert query: 
INSERT INTO time (start_time,...
Insert query executed successfully.
Validating final tables...
Validation passed: 'songplays' populated with

# 6: Analytics time
Analytical questions we are trying to find that can help the business

The ERD Diag for reference

<img src="AWS REDSHIFT UDACITY ERD.png" width="50%"/>


### 6.1: How long is a user using the app on average

In [19]:

query = """ WITH next_play_calc AS (
    SELECT 
        user_id,
        session_id,
        start_time,
        LEAD(start_time) OVER 
            (PARTITION BY user_id, session_id ORDER BY start_time) AS next_play_time
    FROM songplays
), 
duration_per_session AS (
    SELECT 
        user_id, 
        session_id, 
        start_time, 
        COALESCE(EXTRACT(epoch FROM next_play_time - start_time), 0) AS play_duration
    FROM next_play_calc
), 
total_session_play AS (
    SELECT 
        user_id, 
        session_id, 
        SUM(play_duration) AS total_session_duration
    FROM duration_per_session
    GROUP BY user_id, session_id
), 
avg_session_play AS (
    SELECT 
        user_id, 
        ROUND(AVG(total_session_duration/60)) AS avg_session_duration -- converting seconds to minutes
    FROM total_session_play
    GROUP BY user_id
) select * from avg_session_play order by avg_session_duration desc;""" 
df = pd.read_sql(query, conn)
display(df.head())
print(f"Average session duration across our users - {round(df['avg_session_duration'].mean())} minutes")

Unnamed: 0,user_id,avg_session_duration
0,30,164.0
1,24,137.0
2,72,99.0
3,97,92.0
4,15,85.0


Average session duration across our users - 23 minutes


### 6.2: Checking for nulls in key columns of songplays table

In [94]:
query = """SELECT 
    SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS null_user_id,
    SUM(CASE WHEN song_id IS NULL THEN 1 ELSE 0 END) AS null_song_id,
    SUM(CASE WHEN artist_id IS NULL THEN 1 ELSE 0 END) AS null_artist_id,
    SUM(CASE WHEN start_time IS NULL THEN 1 ELSE 0 END) AS null_start_time
FROM songplays;
"""
df = pd.read_sql(query, conn)
display(df)

Unnamed: 0,null_user_id,null_song_id,null_artist_id,null_start_time
0,0,0,0,0


### 6.3: Top 10 Most Played Songs

In [91]:
query = """SELECT 
    s.title, 
    a.name AS artist_name, 
    COUNT(sp.songplay_id) AS play_count
FROM songplays sp
JOIN songs s ON sp.song_id = s.song_id
JOIN artists a ON sp.artist_id = a.artist_id
GROUP BY s.title, a.name
ORDER BY play_count DESC
LIMIT 10;
"""
df = pd.read_sql(query, conn)
display(df)

Unnamed: 0,title,artist_name,play_count
0,You're The One,Dwight Yoakam,37
1,Supermassive Black Hole (Album Version),Muse,28
2,Hey Daddy (Daddy's Home),Usher,18
3,Catch You Baby (Steve Pitron & Max Sanna Radio Edit),Lonnie Gordon,18
4,Girlfriend In A Coma,The Smiths,12
5,The Boy With The Thorn In His Side,The Smiths,12
6,From The Ritz To The Rubble,Arctic Monkeys,9
7,Fade To Black,Metallica,9
8,If I Ain't Got You,Alicia Keys,9
9,I CAN'T GET STARTED,Ron Carter,9


### 6.4: Check how many months worth of data we have in our fact table songplays

In [98]:
query = """SELECT 
    DISTINCT EXTRACT(month from sp.start_time) AS month,
    EXTRACT(year from sp.start_time) AS year
FROM songplays sp;
"""
df = pd.read_sql(query, conn)
display(df)

Unnamed: 0,month,year
0,11,2018


Since the activity is only for the month of Nov 2018, we cant do analysis for over the year which artists were popular

### 6.5: User activity analysis
 Intentionally not joining to the PII information from the user table here

In [100]:
query = """SELECT 
    sp.user_id, 
    COUNT(DISTINCT sp.session_id) AS session_count, 
    COUNT(DISTINCT sp.song_id) AS unique_songs_played
FROM songplays sp
GROUP BY sp.user_id
ORDER BY session_count DESC, user_id asc
LIMIT 10; """
df = pd.read_sql(query, conn)
display(df)

Unnamed: 0,user_id,session_count,unique_songs_played
0,49,21,35
1,80,17,30
2,97,11,27
3,15,8,13
4,44,7,21
5,88,7,16
6,25,6,9
7,29,6,13
8,36,6,13
9,73,6,15


### 6.6: How many users are transitioning from FREE TO PAID subscription

In [104]:
query = """WITH UserChanges AS (
    SELECT
        u.user_id,
        u.level,
        ROW_NUMBER() OVER(PARTITION BY u.user_id ORDER BY s.start_time) AS rn
    FROM users u 
    JOIN songplays s ON u.user_id=s.user_id
    WHERE u.level IN ('free', 'paid')
),
UserTransitions AS (
    SELECT
        uc1.user_id
    FROM UserChanges uc1
    JOIN UserChanges uc2
    ON uc1.user_id = uc2.user_id
    WHERE uc1.rn = 1 AND uc1.level = 'free'
      AND uc2.rn = 2 AND uc2.level = 'paid'
)
SELECT COUNT(DISTINCT user_id) AS conversions
FROM UserTransitions;
"""
df = pd.read_sql(query, conn)
display(df)

Unnamed: 0,conversions
0,0


In Nov-2018 data, no user converted from free to paid

# STEP 5: Clean up your resources

<b><font color='red'>DO NOT RUN THIS UNLESS YOU ARE SURE <br/> 
    We will be using these resources in the next exercises</span></b>

In [20]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.ciiyz6v68qka.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2024, 10, 29, 5, 27, 48, 42000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0378f21c4c2a4c9d0',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-056acecd13129bb4c',
  'AvailabilityZone': 'us-west-2a',
  'PreferredMaintenanceWindow': 'thu:09:30-thu:10:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'P

- run this block several times until the cluster really deleted

In [22]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)



Passing a negative integer is deprecated in version 1.0 and will not be supported in future version. Instead, use None to not limit the column width.



Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.ciiyz6v68qka.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-056acecd13129bb4c
7,NumberOfNodes,4


In [23]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': 'ff094154-78d0-474d-917a-c3a2aa52eefc',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Tue, 29 Oct 2024 15:13:06 GMT',
   'x-amzn-requestid': 'ff094154-78d0-474d-917a-c3a2aa52eefc',
   'content-type': 'text/xml',
   'content-length': '200'},
  'RetryAttempts': 0}}