In [1]:
%load_ext sql

In [1]:
#Import Python libraries
import boto3
import time
import configparser
import json

# Configurations for AWS

In [3]:
CONFIG_FILE = './config/dwh.cfg'
config = configparser.ConfigParser()
config.read_file(open(CONFIG_FILE))
KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')
DWH_IAM_ROLE_NAME = config.get("IAM","ROLE_NAME")

ENDPOINT = config.get("CLUSTER","HOST")
DB_NAME = config.get("CLUSTER","DB_NAME")
DB_USER = config.get("CLUSTER","DB_USER")
DB_PW = config.get("CLUSTER","DB_PASSWORD")
PORT_NUM= config.get("CLUSTER","DB_PORT")

DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

# Accesing AWS using clients

In [5]:
# Created clients for EC2, S3, IAM and Redshift
ec2 = boto3.resource('ec2',
                       region_name='us-west-2',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )


s3 = boto3.resource('s3',
                    region_name='us-west-2',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',
                   region_name='us-west-2',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                  )

redshift = boto3.client('redshift',
                        region_name='us-west-2',
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                       )

# Creating IAM and update IAM Policy

In [8]:
# Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)
try:
    print('Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {
                'Statement': [
                    {
                        'Action': 'sts:AssumeRole',
                        'Effect': 'Allow',
                        'Principal': {'Service': 'redshift.amazonaws.com'}
                    }
                ],
                'Version': '2012-10-17'}
        )
    )
except Exception as e:
    print(e)

Creating a new IAM Role


In [9]:
#Attaching Policy
print('Attaching Policy')
iam.attach_role_policy(
    RoleName=DWH_IAM_ROLE_NAME,
    PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)['ResponseMetadata']['HTTPStatusCode']

print('Get the IAM role ARN')
DWH_ROLE_ARN = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(DWH_ROLE_ARN)

Attaching Policy
Get the IAM role ARN
arn:aws:iam::279032087919:role/dwhRole


In [10]:
# Update config file with DWH_ROLE_ARN
config.set("IAM","ROLE_ARN", DWH_ROLE_ARN)
with open(CONFIG_FILE, "w+") as configfile:
    config.write(configfile)

# Create a Redshift Cluster

In [11]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PW,
        
        #Roles (for s3 access)
        IamRoles=[DWH_ROLE_ARN]  
    )
except Exception as e:
    print(e)

In [12]:
# Check if redshift cluster is available
status = 'Not Available'
while status != 'available':
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    status = myClusterProps['ClusterStatus']
    time.sleep(10)
print(f'red shift cluster is {status}')

red shift cluster is available


In [13]:
#Red shift end point
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
#Update Endpoint in Config file
config.set("CLUSTER","HOST", DWH_ENDPOINT)
with open(CONFIG_FILE, "w+") as configfile:
    config.write(configfile)

## Open an incoming  TCP port to access the cluster endpoint

In [18]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(PORT_NUM),
        ToPort=int(PORT_NUM)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-09b468afaac84fa0a')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [19]:
# Connecting to a cluster
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PW, DWH_ENDPOINT, PORT_NUM,DB_NAME)
print(conn_string)
%sql $conn_string

postgresql://awsuser:3893739Spokan!@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev


'Connected: awsuser@dev'

# ETL

In [20]:
#Create tables (staging, fact,and dimension)
!python3 create_tables.py

Connected

Cursor Created

Dropping existing tables

Creating tables

Connection Closed


In [21]:
# Run ETL Process
!python3 etl.py

Connected

Cursor Created

Loading staging tables

inserted tables
Connection Closed


# Testing

In [22]:
# Print number of rows for Event staging table
%sql SELECT count(*) FROM staging_events;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
8056


In [23]:
# Print number of rows for Song staging table
%sql SELECT count(*) FROM staging_songs;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
14896


In [24]:
# Print number of rows for songplay table
%sql SELECT count(*) FROM songplays;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
333


In [25]:
# Print number of rows for users table
%sql SELECT count(*) FROM users;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
104


In [26]:
# Print number of rows for songs table
%sql SELECT count(*) FROM songs;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
14896


In [27]:
# Print number of rows for artists table
%sql SELECT count(*) FROM artists;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
10025


In [28]:
# Print number of rows for time table
%sql SELECT count(*) FROM time;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
333


## Sample analytical query

#### Show me the list of 10 frequently played songs and their frequencies

In [30]:
%%sql
SELECT s.title AS song, count(song) AS frequency 
FROM songplays sp 
JOIN songs s ON (s.song_id = sp.song_id) 
GROUP BY song
ORDER BY count(*) desc
LIMIT 10;

 * postgresql://awsuser:***@dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com:5439/dev
10 rows affected.


song,frequency
You're The One,37
I CAN'T GET STARTED,9
Catch You Baby (Steve Pitron & Max Sanna Radio Edit),9
Nothin' On You [feat. Bruno Mars] (Album Version),8
Hey Daddy (Daddy's Home),6
Make Her Say,5
Up Up & Away,5
Unwell (Album Version),4
Mr. Jones,4
Supermassive Black Hole (Album Version),4


## Resource Clean up

In [31]:
# Delete cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'awsuser',
  'DBName': 'dev',
  'Endpoint': {'Address': 'dwhcluster.cvjd3f2vtcpt.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2023, 2, 6, 1, 4, 21, 211000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-09b468afaac84fa0a',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0473fb70fab422d24',
  'AvailabilityZone': 'us-west-2d',
  'PreferredMaintenanceWindow': 'fri:07:00-fri:07:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting

In [32]:
# Wait for cluster deletion
print('Redshift Cluster is getting getting deleted...')
cluster_status = 'deleting'
while cluster_status == "deleting":
    try:
        myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
        cluster_status = myClusterProps['ClusterStatus']
        time.sleep(10)
    except Exception as e:
        break
print(f'{DWH_CLUSTER_IDENTIFIER} has been deleted!')

Redshift Cluster is getting getting deleted...
dwhCluster has been deleted!


In [33]:
# Delete IAM Role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)

{'ResponseMetadata': {'RequestId': '57ab982a-a505-4ed7-813a-1fa95c8ee340',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '57ab982a-a505-4ed7-813a-1fa95c8ee340',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Mon, 06 Feb 2023 01:18:47 GMT'},
  'RetryAttempts': 0}}