# Manage Redshift Cluster
**Using AWS Python SDK**

In [1]:
import configparser
import psycopg2
import pandas as pd
import boto3
import json
import time
# import logging
import mylib
from mylib import logger

In [2]:
mylib.log_file_name()

'./logs/test-20190527.log'

In [3]:
logger.info('===[  Inititate Cluster  ]===')
logger.info(time.strftime('%Y-%m-%d  %I:%M:%S %p'))

## Load DWH, Cluster and DB Params from config file

In [4]:
CONFIG_FILE = 'dwh.cfg'
config = configparser.ConfigParser()
config.read_file(open(CONFIG_FILE))

In [5]:
def load_config():
    """
    Load DWH Cluster and DB Params from config file

    """

    DWH_CLUSTER_TYPE       = config['DWH']['DWH_CLUSTER_TYPE']
    DWH_NUM_NODES          = config['DWH']['DWH_NUM_NODES']
    DWH_NODE_TYPE          = config['DWH']['DWH_NODE_TYPE']
    DWH_CLUSTER_IDENTIFIER = config['DWH']['DWH_CLUSTER_IDENTIFIER']
    DWH_REGION             = config['DWH']['DWH_REGION']

    HOST                   = config['CLUSTER']['HOST']
    DB_NAME                = config['CLUSTER']['DB_NAME']
    DB_USER                = config['CLUSTER']['DB_USER']
    DB_PASSWORD            = config['CLUSTER']['DB_PASSWORD']
    DB_PORT                = config['CLUSTER']['DB_PORT']

    IAM_ROLE_NAME          = config['IAM_ROLE']['IAM_ROLE_NAME']
    IAM_POLICY_ARN         = config['IAM_ROLE']['IAM_POLICY_ARN']
    ARN                    = config['IAM_ROLE']['ARN']

    return DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, \
           DWH_CLUSTER_IDENTIFIER, DWH_REGION, \
           HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, \
           IAM_ROLE_NAME, IAM_POLICY_ARN, ARN

In [6]:
DWH_CLUSTER_TYPE, \
DWH_NUM_NODES, \
DWH_NODE_TYPE, \
DWH_CLUSTER_IDENTIFIER, \
DWH_REGION, \
HOST, \
DB_NAME, \
DB_USER, \
DB_PASSWORD, \
DB_PORT, \
IAM_ROLE_NAME, \
IAM_POLICY_ARN, \
ARN  \
 = load_config()


params = pd.DataFrame({"Param":
                ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE",
                 "DWH_CLUSTER_IDENTIFIER", "DWH_REGION",
                 "HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT",
                 "IAM_ROLE_NAME", "IAM_POLICY_ARN", "ARN"
                ],
              "Value":
                [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, 
                 DWH_CLUSTER_IDENTIFIER, DWH_REGION,
                 HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, 
                 IAM_ROLE_NAME, IAM_POLICY_ARN, ARN
                ]
            })
params

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_REGION,us-west-2
5,HOST,dwhcluster.cbsjbxldkge8.us-west-2.redshift.ama...
6,DB_NAME,sparkify
7,DB_USER,dwhuser
8,DB_PASSWORD,Passw0rd
9,DB_PORT,5439


In [7]:
# Write the config params to the LOGFILE
for i in range(params.shape[0]):
    logger.info('{}:  {}'.format(params.Param[i], params.Value[i]))

## Load AWS Params from separate config file

In [8]:
config_aws = configparser.ConfigParser()
config_aws.read_file(open('aws.cfg'))

AWS_KEY     = config_aws['AWS']['KEY']
AWS_SECRET  = config_aws['AWS']['SECRET']

## Create clients for EC2, S3, IAM, and Redshift

In [9]:
ec2 = boto3.resource('ec2',
                      region_name           = DWH_REGION,
                      aws_access_key_id     = AWS_KEY,
                      aws_secret_access_key = AWS_SECRET
                     )

s3 = boto3.resource('s3',
                     region_name           = DWH_REGION,
                     aws_access_key_id     = AWS_KEY,
                     aws_secret_access_key = AWS_SECRET
                    )

iam = boto3.client('iam',
                    region_name           = DWH_REGION,
                    aws_access_key_id     = AWS_KEY,
                    aws_secret_access_key = AWS_SECRET
                   )

redshift = boto3.client('redshift',
                         region_name           = DWH_REGION,
                         aws_access_key_id     = AWS_KEY,
                         aws_secret_access_key = AWS_SECRET
                       )


## Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [10]:
try:
    print('1.1 Creating a new IAM Role')
    logger.info('Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path                     = '/',
        RoleName                 = IAM_ROLE_NAME,
        Description              = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument = json.dumps(
            {'Statement' : [{'Action'    : 'sts:AssumeRole',
                             'Effect'    : 'Allow',
                             'Principal' : {'Service': 'redshift.amazonaws.com'}
                            }],
             'Version'   : '2012-10-17'
            }
        )
    )
except Exception as e:
    print(e)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.


In [11]:
print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName = IAM_ROLE_NAME,
                       PolicyArn = IAM_POLICY_ARN
                      )['ResponseMetadata']['HTTPStatusCode']

1.2 Attaching Policy


200

In [12]:
print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName = IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)
logger.info('New IAM Role ARN:  {}'.format(roleArn))

1.3 Get the IAM role ARN
arn:aws:iam::376450510082:role/dwhRole


## Create a RedShift Cluster

In [13]:
try:
    response = redshift.create_cluster(        
        # parameters for hardware
        ClusterType        = DWH_CLUSTER_TYPE,
        NodeType           = DWH_NODE_TYPE,
        NumberOfNodes      = int(DWH_NUM_NODES),
        
        # parameters for identifiers & credentials
        DBName             = DB_NAME,
        ClusterIdentifier  = DWH_CLUSTER_IDENTIFIER,
        MasterUsername     = DB_USER,
        MasterUserPassword = DB_PASSWORD,

        #  parameter for role (to allow s3 access)
        IamRoles           = [roleArn]
    )
except Exception as e:
    print(e)
finally:
    logger.info('Create cluster successful')

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


### Describe the cluster to see its status

In [14]:
def pretty_Redshift_props(props):
    """
    """
        
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", 
                  "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data = x, columns = ["Key", "Value"])


myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
dfClusterProps = pretty_Redshift_props(myClusterProps)
dfClusterProps

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,creating
3,MasterUsername,dwhuser
4,DBName,sparkify
5,VpcId,vpc-aa278fd2
6,NumberOfNodes,4


### WAIT...keep pinging until the cluster is running

In [15]:
# Write the cluster props to the LOGFILE
for i in range(dfClusterProps.shape[0]):
    logger.info('{}:  {}'.format(dfClusterProps.Key[i], dfClusterProps.Value[i]))

In [16]:
bClusterAvailable = False
nMinutes = 0

while True:
    time.sleep(60)
    nMinutes += 1
    print("waiting... {} minutes".format(nMinutes))

    # then check the status
    myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    
    if  myClusterProps['ClusterStatus'] == 'available':
        bClusterAvailable = True
        logger.info('ClusterStatus:  available')
        logger.info('time to spin cluster:  {} minutes'.format(nMinutes))
        break
    
    if nMinutes >= 7:
        print("7 minute time limit reached")
        logger.info('7 minute time limit reached')
        break
    
# cluster should be available now, OR the 7 minute limit has passed
pretty_Redshift_props(myClusterProps)

waiting... 1 minutes
waiting... 2 minutes
waiting... 3 minutes
waiting... 4 minutes


Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,sparkify
5,Endpoint,"{'Address': 'dwhcluster.cbsjbxldkge8.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-aa278fd2
7,NumberOfNodes,4


## Write the HOST and role ARN to the config file

In [17]:
if bClusterAvailable:
    HOST = myClusterProps['Endpoint']['Address']
    ARN  = myClusterProps['IamRoles'][0]['IamRoleArn']
    print("HOST :: ", HOST)
    print("ARN  :: ", ARN)

    config['CLUSTER']['HOST'] = HOST
    config['IAM_ROLE']['ARN'] = "\'{}\'".format(ARN)

    with open(CONFIG_FILE, 'w') as configfile:
        config.write(configfile)
        print("Saved to config file:  ", CONFIG_FILE)
        
    logger.info('HOST:  {}'.format(HOST))
    logger.info('ARN:   {}'.format(ARN))

HOST ::  dwhcluster.cbsjbxldkge8.us-west-2.redshift.amazonaws.com
ARN  ::  arn:aws:iam::376450510082:role/dwhRole
Saved to config file:   dwh.cfg


---
# Cluster is Live!!

## Open an incoming TCP port to access the cluster endpoint

In [18]:
try:
    vpc = ec2.Vpc(id = myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName  = defaultSg.group_name,
        CidrIp     = '0.0.0.0/0',
        IpProtocol = 'TCP',
        FromPort   = int(DB_PORT),
        ToPort     = int(DB_PORT)
    )
except Exception as e:
    print(e)

finally:
    logger.info('Opened TCP port on cluster endpoint')

ec2.SecurityGroup(id='sg-6a554720')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## Make sure you can connect to the cluster/database

In [19]:
try:
    conn_string = "host={} dbname={} user={} password={} port={}"
    conn_string = conn_string.format(*config['CLUSTER'].values())
    conn = psycopg2.connect( conn_string )
    cur = conn.cursor()
    print(conn_string)
    logger.info(conn_string)

except Exception as e:
    print(e)
    
finally:
    logger.info('connected to database:  {}'.format(DB_NAME))

host=dwhcluster.cbsjbxldkge8.us-west-2.redshift.amazonaws.com dbname=sparkify user=dwhuser password=Passw0rd port=5439


In [20]:
logger.info('Ready for ETL...')

---
# Run the ETL scripts...
>
> **...**  
> **...   create_tables.py**  
> **...   etl.py**  
> **...**  
>  

---
# Shutdown Cluster & Clean Up

In [21]:
import mylib
mylib.log_file_name()

'./logs/test-20190527.log'

In [22]:
# reset the logger just in case I've gone past midnight, and into the next date
# mylib.reset_logger()
logger.info('Shutting down Cluster...')

In [23]:
redshift.delete_cluster( ClusterIdentifier = DWH_CLUSTER_IDENTIFIER, 
                         SkipFinalClusterSnapshot = True )


myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
pretty_Redshift_props(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,sparkify
5,Endpoint,"{'Address': 'dwhcluster.cbsjbxldkge8.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-aa278fd2
7,NumberOfNodes,4


## WAIT... keep pinging until the cluster is deleted

In [24]:
bClusterDeleted = False
nMinutes = 0

try:
    while True:
        time.sleep(60)
        nMinutes += 1
        print("waiting... {} minutes".format(nMinutes))

        # then check the status
        myClusterProps = redshift.describe_clusters(ClusterIdentifier = DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

        if nMinutes >= 10:
            print("10 minute time limit reached. Cluster may not have been deleted yet.")
            break

        if  myClusterProps['ClusterStatus'] == 'deleting':
            continue

except Exception as e:
    bClusterDeleted = True
    print(e)
    print("Meaning the Cluster was successfully deleted.")

waiting... 1 minutes
waiting... 2 minutes
waiting... 3 minutes
waiting... 4 minutes
waiting... 5 minutes
waiting... 6 minutes
An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.
Meaning the Cluster was successfully deleted.


In [25]:
if bClusterDeleted:
    iam.detach_role_policy(RoleName = IAM_ROLE_NAME, 
                           PolicyArn = IAM_POLICY_ARN)
    iam.delete_role(RoleName = IAM_ROLE_NAME)

    logger.info('===[  Cluster Deleted  ]===')