# IaC for data warehouse
This notebook provides the tools to create, delete and query a redshift cluster for the data warehouse project.

In [1]:
import pandas as pd
import boto3
import json
import configparser
import psycopg2
import time
from botocore.exceptions import ClientError

In [2]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", 
                  "ClusterStatus", "MasterUsername", "DBName", 
                  "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

## Configuration and boto3 clients
Make sure to always run these cells.

In [3]:
config = configparser.ConfigParser()

config.read_file(open('iac.cfg'))

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

config.read_file(open('key.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

In [4]:
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

## Create IAM role and Redshift cluster
Only run these cells if there is currently no cluster running that can be used for the project.

In [8]:
# Create new IAM role
try:
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
# Create attach AmazonS3ReadOnlyAccess policy to the IAM role
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']


# Get the IAM role ARN
DWH_ROLE_ARN = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(DWH_ROLE_ARN)

arn:aws:iam::986480943738:role/dwhRole


In [9]:
# Initialize new Redshift cluster
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        # NumberOfNodes=int(DWH_NUM_NODES),

        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        IamRoles=[DWH_ROLE_ARN]  
    )
except Exception as e:
    print(e)

# Monitor cluster until it becomes available 
while True:
    cluster_props = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    if cluster_props['ClusterStatus'] == 'available':
        break
    else:
        time.sleep(5)

# Get the cluster endpoint
DWH_ENDPOINT = cluster_props['Endpoint']['Address']
prettyRedshiftProps(cluster_props)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,dwh
5,Endpoint,"{'Address': 'redshift-cluster.ck9s1la24v5d.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-5aadc322
7,NumberOfNodes,1


In [10]:
# Export dwh configuration file

config_dwh = configparser.ConfigParser()

config_dwh['CLUSTER'] = {
    'HOST' : DWH_ENDPOINT,
    'DB_NAME' : config['DWH']['DWH_DB'],
    'DB_USER' : config['DWH']['DWH_DB_USER'],
    'DB_PASSWORD' : config['DWH']['DWH_DB_PASSWORD'],
    'DB_PORT' : config['DWH']['DWH_PORT']
}

config_dwh['IAM_ROLE'] = {'ARN' : DWH_ROLE_ARN}
config_dwh['S3'] = config['S3']

with open('dwh.cfg', 'w') as config_dwh_file:
    config_dwh.write(config_dwh_file)

## Run queries on the Redshift cluster

In [12]:
# Read DWH config
config_conn = configparser.ConfigParser()
config_conn.read('dwh.cfg')

['dwh.cfg']

In [13]:
# Run a query

query = ('''
SELECT * 
FROM stl_load_errors 
''')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config_conn['CLUSTER'].values()))
df = pd.read_sql_query(query, conn)
conn.close()

df

Unnamed: 0,userid,slice,tbl,starttime,session,query,filename,line_number,colname,type,col_length,position,raw_line,raw_field_value,err_code,err_reason


## Delete IAM role and Redshift cluster
This part of the script uses the DWH_CLUSTER_IDENTIFIER and DWH_IAM_ROLE_NAME defined in iac.cfg. It is not required to use the Arn and Endpoint.

In [14]:
# Delete Redshift cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True);

In [15]:
# Make sure the cluster has been deleted
cluster_props = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(cluster_props)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,awsuser
4,DBName,dwh
5,Endpoint,"{'Address': 'redshift-cluster.ck9s1la24v5d.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-5aadc322
7,NumberOfNodes,1


In [16]:
# Delete IAM role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess");
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME);