In [None]:
import boto3
import configparser
import pandas as pd
from botocore.exceptions import ClientError
import json
from sql_queries import create_table_queries, drop_table_queries, copy_table_queries, insert_table_queries

# 1. Create Redshift Cluster and its dependancies 

## STEP 0: Make sure you have an AWS secret and access key

#### Load DWH Params from a file

In [None]:
# Read the configured params
config = configparser.ConfigParser()
config.read('dwh.cfg')

# AWS
KEY = config.get('AWS', 'KEY')
SECRET = config.get('AWS', 'SECRET')

# DWH
DWH_CLUSTER_TYPE  = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES  = config.get('DWH', 'DWH_NUM_NODES')
DWH_NODE_TYPE  = config.get('DWH', 'DWH_NODE_TYPE')
DWH_CLUSTER_IDENTIFIER  = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB  = config.get('DWH', 'DWH_DB')
DWH_DB_USER  = config.get('DWH', 'DWH_DB_USER')
DWH_PASSWORD  = config.get('DWH', 'DWH_PASSWORD')
DWH_PORT  = config.get('DWH', 'DWH_PORT')
DWH_IAM_ROLE_NAME  = config.get('DWH', 'DWH_IAM_ROLE_NAME')

# Print params
pd.DataFrame(data = {'Params': ['DWH_CLUSTER_TYPE',
                    'DWH_NUM_NODES',
                    'DWH_NODE_TYPE',
                    'DWH_CLUSTER_IDENTIFIER',
                    'DWH_DB',
                    'DWH_DB_USER',
                    'DWH_PASSWORD',
                    'DWH_PORT',
                    'DWH_IAM_ROLE_NAME'],
                    'Value': [DWH_CLUSTER_TYPE,
                            DWH_NUM_NODES,
                            DWH_NODE_TYPE,
                            DWH_CLUSTER_IDENTIFIER,
                            DWH_DB,
                            DWH_DB_USER,
                            DWH_PASSWORD,
                            DWH_PORT,
                            DWH_IAM_ROLE_NAME]})

#### Create clients for IAM, EC2, S3 and Redshift

In [None]:
ec2 = boto3.resource('ec2', region_name = 'us-west-2', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

s3 = boto3.resource('s3', region_name = 'us-west-2', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

iam = boto3.client('iam', region_name = 'us-west-2', aws_access_key_id = KEY, aws_secret_access_key = SECRET)

redshift = boto3.client('redshift',
                       aws_access_key_id = KEY,
                       aws_secret_access_key = SECRET,
                       region_name = 'us-west-2')

## STEP 1: IAM ROLE
- Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [None]:
# - Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)
try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
    Path = '/',
    RoleName = DWH_IAM_ROLE_NAME,
    Description = 'Allows Redshift clusters to call AWS service on your behalf.',
    AssumeRolePolicyDocument = json.dumps(
                {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )
except ClientError as e:
    print(e)
    
print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName = DWH_IAM_ROLE_NAME,
                      PolicyArn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')['ResponseMetadata']['HTTPStatusCode']
print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

## STEP 2:  Redshift Cluster

- Create a RedShift Cluster
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [None]:
# Create a RedShift Cluster
try:
    response = redshift.create_cluster(
        ClusterType = DWH_CLUSTER_TYPE,
        NodeType = DWH_NODE_TYPE,
        NumberOfNodes = int(DWH_NUM_NODES),
        DBName = DWH_DB,
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
        MasterUsername = DWH_DB_USER,
        MasterUserPassword = DWH_PASSWORD,
        IamRoles = [roleArn]
    )
except ClientError as e:
    print(e)

### 2.1 *Describe* the cluster to see its status
- run this block several times until the cluster status becomes `Available`

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

### <h2> 2.2 Take note of the cluster <font color='red'> endpoint and role ARN </font> </h2>

<font color='red'>DO NOT RUN THIS unless the cluster status becomes "Available" </font>

In [None]:
# DO NOT RUN THIS unless the cluster status becomes "Available"
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
# print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
# print("DWH_ROLE_ARN :: ", roleArn)

### 2.3 Write the params in [CLUSTER], [ARN]
- To provide the param for the dwh.dfg

In [None]:
# remove sections if exists

secs = ['CLUSTER', 'IAM_ROLE']
for s in secs:
    config.remove_section(s)
    
config.add_section('CLUSTER')
config['CLUSTER']['HOST'] = DWH_ENDPOINT
config['CLUSTER']['DB_NAME'] = DWH_DB
config['CLUSTER']['DB_USER'] = DWH_DB_USER
config['CLUSTER']['DB_PASSWORD'] = DWH_PASSWORD
config['CLUSTER']['DB_PORT'] = DWH_PORT

config.add_section('IAM_ROLE')
config['IAM_ROLE']['ARN'] = DWH_ROLE_ARN

with open('dwh.cfg', 'w') as configfile:
    config.write(configfile)

## STEP 3: Open an incoming  TCP port to access the cluster ednpoint

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

## STEP 4: Make sure you can connect to the cluster

In [None]:
get_ipython().run_line_magic('load_ext', 'sql')
conn_string = f'postgresql://{DWH_DB_USER}:{DWH_PASSWORD}@{DWH_ENDPOINT}:{DWH_PORT}/{DWH_DB}'
print(conn_string)
get_ipython().run_line_magic('sql', '$conn_string')

# 2. ETL
- 1. Drop Tables if exists
- 2. Create Tables
- 3. Staging tables
- 4. Insert into tables

In [None]:
def manipulate_tables(table_queries):
    for query in table_queries:
        get_ipython().run_line_magic('sql', query)
# drop tables
manipulate_tables(drop_table_queries)
# create tables
manipulate_tables(create_table_queries)
# load_staging_tables
manipulate_tables(copy_table_queries)
# insert tables
manipulate_tables(insert_table_queries)

### Check tables

In [None]:
# get the tables in public schema
test_query = """
select t.table_name
from information_schema.tables t
where t.table_schema = 'public'  -- put schema name here
      and t.table_type = 'BASE TABLE'
order by t.table_name;
"""
tables = get_ipython().run_line_magic('sql', test_query)

In [None]:
# check the count for each table
for tb in tables:
    count = get_ipython().run_line_magic('sql', f'select count(*) as count_in_{tb[0]} from {tb[0]}')
    print(count)

# 3. Clean up your resources

In [None]:
# Clean up your resources
redshift.delete_cluster(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot = True)

# check out whether the cluster really deleted
myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
iam.detach_role_policy(RoleName = DWH_IAM_ROLE_NAME, PolicyArn = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')
iam.delete_role(RoleName = DWH_IAM_ROLE_NAME)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)