# Set up AWS Environment

## Import Python packages and modules

In [1]:
import configparser
import boto3
import botocore
import json
import time
import create_tables
import etl

## Get credentials from configuration file

In [2]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

ARN = config.get('IAM_ROLE', 'ARN')
HOST = config.get('CLUSTER', 'HOST')

DWH_CLUSTER_IDENTIFIER = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

DB_NAME = config.get("CLUSTER","DB_NAME")
DB_USER = config.get("CLUSTER","DB_USER")
DB_PASSWORD = config.get("CLUSTER","DB_PASSWORD")
DB_PORT = config.get("CLUSTER","DB_PORT")

## Create AWS Clients

In [3]:
ec2 = boto3.resource('ec2')
s3 = boto3.resource('s3')
iam = boto3.client('iam')
redshift = boto3.client('redshift')

## Create IAM role with read access to S3 

In [4]:
def createIamRole():
    '''Create IAM role.'''
    try:
        print('1.1 Creating a new IAM Role')
        sparkifyRole = iam.create_role(
            Path='/',
            RoleName=ARN,
            Description="Allows Redshift clusters to call AWS services on your behalf.",
            AssumeRolePolicyDocument=json.dumps(
                {'Statement': [{'Action': 'sts:AssumeRole',
                                'Effect': 'Allow',
                                'Principal': {'Service': 'redshift.amazonaws.com'}}],
                    'Version': '2012-10-17'})
        )
        print('1.1 Attaching a Policy')
        iam.attach_role_policy(RoleName=ARN,
                               PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                               )['ResponseMetadata']['HTTPStatusCode']
        print('1.3 Get the IAM role ARN')
        roleArn = iam.get_role(RoleName=ARN)['Role']['Arn']

        print('IAM Role created successfully.')
        return roleArn
    except Exception as e:
        print(e)
        
createIamRole()

1.1 Creating a new IAM Role
1.1 Attaching a Policy
1.3 Get the IAM role ARN
IAM Role created successfully.


'arn:aws:iam::488216229776:role/sparkifyRole'

## Launch redshift cluster

In [5]:
def createRedshiftCluster():
    roleArn = iam.get_role(RoleName=ARN)['Role']['Arn']
    print('Creating cluster "{}".'.format(DWH_CLUSTER_IDENTIFIER))
    try:
        response = redshift.create_cluster(        

            ClusterType=DWH_CLUSTER_TYPE,
            NodeType=DWH_NODE_TYPE,
            NumberOfNodes=int(DWH_NUM_NODES),

            DBName=DB_NAME,
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            MasterUsername=DB_USER,
            MasterUserPassword=DB_PASSWORD,

            IamRoles=[roleArn],
            
            PubliclyAccessible=True
        )
    except Exception as e:
            print(e)

    status = response['Cluster']['ClusterStatus']

    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
#     print(myClusterProps)
    time.sleep(20)
    print('Cluster creation is still in progress, checking status every 10 seconds.')
    while True:
        if status == 'creating':
            time.sleep(10)
            myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
            status = myClusterProps['ClusterStatus']
        elif status == 'available':
            sparkifyHost = myClusterProps['Endpoint']['Address']
            print('sparkifyHost: ', sparkifyHost)
            print('Created Redshift cluster "{}".'.format(DWH_CLUSTER_IDENTIFIER))
            break
            
createRedshiftCluster()

Creating cluster "sparkifycluster".
Cluster creation is still in progress, checking status every 10 seconds.
sparkifyHost:  sparkifycluster.ci6me8yds14k.us-east-1.redshift.amazonaws.com
Created Redshift cluster "sparkifycluster".


## Open TCP Port with 'default' security group

In [6]:
def openTCP():
    try:
        myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
        vpc = ec2.Vpc(id=myClusterProps['VpcId'])
        defaultSg = list(vpc.security_groups.all())[1]
        print(defaultSg)
        defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DB_PORT),
            ToPort=int(DB_PORT)
        )
    except Exception as e:
        print(e)
openTCP()

ec2.SecurityGroup(id='sg-05390f6b6bd310aa7')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


# ETL Pipeline: create fact and dimension tables then load data from S3 into staging tables on Redshift for processing

## Connect to 'sparkifydb' and create the necessary tables 

In [7]:
create_tables.main()

## Confirm table creations

In [None]:
%load_ext sql

In [None]:
conn_string = "postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, HOST, DB_PORT, DB_NAME)
print(conn_string)

In [None]:
%sql $conn_string

In [None]:
nSongplays = %sql select count(*) from songplays;
nUsers = %sql select count(*) from users;
nSongs = %sql select count(*) from songs;
nArtists = %sql select count(*) from artists;
nTime = %sql select count(*) from time;
nEventsStaging = %sql select count(*) from events_staging;
nSongsStaging = %sql select count(*) from songs_staging;

print("nSongplays\t\t=", nSongplays[0][0])
print("nUsers\t=", nUsers[0][0])
print("nSongs\t=", nSongs[0][0])
print("nArtists\t=", nArtists[0][0])
print("nTime\t\t=", nTime[0][0])
print("nEventsStaging\t\t=", nEventsStaging[0][0])
print("nSongsStaging\t\t=", nSongsStaging[0][0])

### read log_json_path.json file

In [None]:
import boto3
import json

s3 = boto3.client('s3')
resp = s3.get_object(Bucket='udacity-dend', Key='log_json_path.json')
content = resp['Body'].read()
json = json.loads(content)
print(json)

## Load staging tables and database tables

In [8]:
etl.main()

QueryCanceledError: Query cancelled on user's request


In [None]:
%sql select * from stl_load_errors order by starttime desc limit 1

# Clean up AWS resources: run the following cells ONLY when all ETL processing is complete!

## Delete IAM Role

In [9]:
def deleteIamResources(iamRole):
    '''Delete all created IAM resources.'''
    iam.detach_role_policy(RoleName=iamRole, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
    iam.delete_role(RoleName=iamRole)
    print('Deleted IAM Role Resources.')
    return

deleteIamResources(ARN)

Deleted IAM Role Resources.


## Delete redshift cluster

In [10]:
def deleteRedshiftResources(iamRole):
    '''Delete created Redshift resources.'''
    try:
        print('Cluster deletion in progress, checking status every 10 seconds.')
        redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)
        while True:
            time.sleep(10)
            if not redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER):
                break 
  
    except Exception:
    # except botocore.errorfactory.ClusterNotFoundFault:
    # except botocore.exceptions.ClientError:
        print('The cluster "{}" no longer exists.'.format(DWH_CLUSTER_IDENTIFIER))
        
deleteRedshiftResources(ARN)

Cluster deletion in progress, checking status every 10 seconds.
The cluster "sparkifycluster" no longer exists.
