# AWS Project with S3 and Redshift
### Import the necessary packages

In [1]:
import pandas as pd
import boto3
import json

# STEP 0: Use your own AWS secret and access key from AWS

- Create a new IAM user in your AWS account
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab
- Take note of the access key and secret 
- Edit the file `dwh.cfg` in the same folder as this notebook and fill in the correct details
<font color='purple'>
<BR>
[AWS]<BR>
KEY= YOUR_AWS_KEY<BR>
SECRET= YOUR_AWS_SECRET<BR>
<font/>


# Load DWH Params from a config file

In [7]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

# NOTE: Un-comment this to print the result.
#pd.DataFrame({"Param":
#                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
#              "Value":
#                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
#             })

## Create clients for EC2, S3, IAM, and Redshift

In [8]:
import boto3

ec2 = boto3.resource(   'ec2', 
                      region_name="us-west-2",
                      aws_access_key_id=KEY,
                      aws_secret_access_key=SECRET)

s3 = boto3.resource(  's3',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

iam = boto3.client('iam',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

redshift = boto3.client('redshift',
                        region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET)

## Check out the sample data sources on S3

In [9]:
LOG_DATA      = config.get("S3", "BUCKET")
logDataBucket = s3.Bucket(LOG_DATA)
count = 0

# Iterate over log_data bucket objects and print
for object in logDataBucket.objects.filter(Prefix='log_data'):
    count += 1
    print(object)
print("COUNT: " + str(count))
# => COUNT: 31

In [None]:
size = sum(1 for _ in logDataBucket.Bucket.objects.filter(Prefix='log_data'))
print(size)

In [10]:
SONG_DATA      = config.get("S3", "BUCKET")
songDataBucket = s3.Bucket(SONG_DATA)
count = 0

# Iterate over song_data bucket objects and print
for object in logDataBucket.objects.filter(Prefix='song_data'):
    count += 1
    print(object)
print("COUNT: " + str(count))

In [11]:
SONG_DATA      = config.get("S3", "BUCKET")
songDataBucket = s3.Bucket(SONG_DATA)
count = 0

# Iterate over song_data bucket objects and print
for object in logDataBucket.objects.filter(Prefix='log_json'):
    count += 1
    print(object)
print("COUNT: " + str(count))

In [12]:
LOG_DATA      = config.get("S3", "BUCKET")
LOCAL_PATH    = config.get("S3", "LOCAL_PATH")
SONGS_JSONPATH = config.get("S3", "SONGS_JSONPATH")
print(LOG_DATA)
print(SONGS_JSONPATH)

## STEP 1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [16]:
# Create the IAM role (if not exists)

try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description="Allow Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
                'Effect': 'Allow',
                'Principal': {'Service': 'redshift.amazonaws.com'}}],
            'Version': '2012-10-17'})
    )
    

except Exception as e:
    print(e)


In [17]:
# Attach Policy

print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                      PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']


In [18]:
# Get and print the IAM role ARN
print('1.3 Get the IAM role ARN')
iam_role = iam.get_role(
                        RoleName=DWH_IAM_ROLE_NAME
                        )
roleArn = iam_role['Role']['Arn']
# NOTE: Un-comment this to print the result.
#print(roleArn)

## STEP 2:  Redshift Cluster

- Create a RedShift Cluster
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [44]:
try:
    response = redshift.create_cluster( 
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

## 2.1 *Describe* the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [19]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
# NOTE: Un-comment this to print the result.
#prettyRedshiftProps(myClusterProps)

<h2> 2.2 Take note of the cluster <font color='red'> endpoint and role ARN </font> </h2>

<font color='red'>DO NOT RUN THIS unless the cluster status becomes "Available" </font>

In [21]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

## STEP 3: Open an incoming  TCP port to access the cluster endpoint

In [22]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName= defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP', 
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

## STEP 4: Make sure you can connect to the cluster

In [23]:
%load_ext sql

In [68]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
# NOTE: Un-comment this to print the result.
#print(conn_string)
%sql $conn_string

## STEP 5: Test COPIED and INSERTED data

### 5.1: Query staging tables

In [None]:
#%load_ext sql

In [26]:
# Number of items in staging_events table
%%time
%%sql
SELECT COUNT(*)
FROM staging_events;

In [None]:
# Number of items in staging_songs table
%%time
%%sql
SELECT COUNT(*)
FROM staging_songs;

### 5.2 Query Analysis tables

In [None]:
# Number of items in users table
%%time
%%sql
SELECT COUNT(*)
FROM users;

In [None]:
# Number of items in songs table
%%time
%%sql
SELECT COUNT(*)
FROM songs;

In [None]:
# Number of items in artists table
%%time
%%sql
SELECT COUNT(*)
FROM artists;

In [None]:
# Number of items in time table
%%time
%%sql
SELECT COUNT(*)
FROM time;

In [None]:
# Number of items in songplay table
%%time
%%sql
SELECT COUNT(*)
FROM songplays;

In [29]:
# Query to answer a question: Who played which song and when.
%%time
%%sql
SELECT  sp.songplay_id,
        u.user_id,
        u.last_name,
        u.first_name,
        sp.start_time,
        a.name,
        s.title
FROM songplays AS sp
        JOIN users   AS u ON (u.user_id = sp.user_id)
        JOIN songs   AS s ON (s.song_id = sp.song_id)
        JOIN artists AS a ON (a.artist_id = sp.artist_id)
        JOIN time    AS t ON (t.start_time = sp.start_time)
ORDER BY (u.last_name)
LIMIT 100;

## STEP 6: Clean up your resources

In [69]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
#redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

- run this block several times until the cluster really deleted

In [27]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
# NOTE: Un-comment this to print the result.
#prettyRedshiftProps(myClusterProps)

In [28]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
#iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
#iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!