## Importing packages

In [1]:
import pandas as pd
import boto3
import configparser
from botocore.exceptions import ClientError
import json

## Load database params

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))


KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DB_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })



Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhadmin


## Creating clients

In [3]:
ec2 = boto3.resource('ec2',
                     region_name = 'us-east-1',
                     aws_access_key_id = KEY,
                     aws_secret_access_key = SECRET)

s3 = boto3.resource('s3',
                    region_name = 'us-east-1',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

iam = boto3.client('iam',
                    region_name = 'us-east-1',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

redshift = boto3.client('redshift',
                    region_name = 'us-east-1',
                    aws_access_key_id = KEY,
                    aws_secret_access_key = SECRET)

## Creating an IAM Role that makes Redshift able to access s3 bucket

In [6]:
# creating an IAM role
dwhRole = iam.create_role(
    Path = '/',
    RoleName = DWH_IAM_ROLE_NAME,
    Description = 'Allows Redshift clusters to call AWS services on your behalf',
    AssumeRolePolicyDocument = json.dumps({
        'Statement': [{'Action': 'sts:AssumeRole',
                       'Effect': 'Allow',
                       'Principal': {'Service': 'redshift.amazonaws.com'}}],
        'Version': '2012-10-17'
    })
)

# attaching policy
iam.attach_role_policy(RoleName = DWH_IAM_ROLE_NAME,
                       PolicyArn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess',
                       )['ResponseMetadata']['HTTPStatusCode']

# getting the IAM role ARN
roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

ClientError: An error occurred (AccessDenied) when calling the CreateRole operation: User: arn:aws:iam::071061792069:user/dwhadmin is not authorized to perform: iam:CreateRole on resource: arn:aws:iam::071061792069:role/dwhadmin with an explicit deny in an identity-based policy

## Creating a redshift cluster

In [None]:
response = redshift.create_cluster(
    ClusterType = DWH_CLUSTER_TYPE,
    NodeType = DWH_NODE_TYPE,
    NumberOfNodes = int(DWH_NUM_NODES),
    DBName = DWH_DB,
    ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
    MasterUsername = DWH_DB_USER,
    MasterUserPassword = DWH_DB_PASSWORD,
    IamRoles = [roleArn]
)

### Checking cluster status

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername",
                "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k, v in props.items() if k in keysToShow]
    return pd.DataFrame(data = x, columns = ['Key', 'Value'])

myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

## Opening an incoming TCP port to access the cluster endpoint 

In [None]:
vpc = ec2.Vpc(id = myClusterProps['VpcId'])
defaultSg = list(vpc.security_groups.all())[0]
print(defaultSg)

defaultSg.authorize_ingress(
    GroupName = defaultSg.group_name,
    CidrIp = '0.0.0.0/0',
    IpProtocol = 'TCP',
    FromPort = int(DWH_PORT),
    ToPort = int(DWH_PORT)
)

## Connecting to the Cluster

In [None]:
%load_ext sql

In [None]:
conn_string = "postgresql://{}:{}@{}:{}/{}".format(
    DWH_DB_USER,
    DWH_DB_PASSWORD,
    DWH_ENDPOINT,
    DWH_PORT,
    DWH_DB
)

%sql $conn_string

In [None]:
%%sql 
SELECT * 
FROM artists
LIMIT 10;

In [None]:
%%sql
SELECT *
FROM time
LIMIT 10;

In [None]:
%%sql
SELECT *
FROM songs
LIMIT 10;

In [None]:
%%sql
SELECT *
FROM songplays
ORDER BY songplay_id
LIMIT 10;

In [None]:
%%sql
delete from staging_events;
delete from staging_songs;
delete from songplays;
delete from users;
delete from songs;
delete from artists;
delete from time;

## Cleaning up resources

In [None]:
redshift.delete_cluster(
    ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
    SkipFinalClusterSnapshot=True
)

myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER
                                            )['Clusters'][0]

In [None]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)