In [None]:
import configparser
import boto3
import json
import pandas as pd
import psycopg2

In [None]:
config=configparser.ConfigParser()
config.read_file(open('redshift_configuration.cfg'))

In [None]:
KEY                    = config.get('IAM_USER','KEY')
SECRET                 = config.get('IAM_USER','SECRET')


CLUSTER_IDENTIFIER     = config.get('CLUSTER', 'CLUSTER_IDENTIFIER')
CLUSTER_TYPE           = config.get('CLUSTER', 'CLUSTER_TYPE')
NUM_NODES              = config.get('CLUSTER', 'NUM_NODES')
NODE_TYPE              = config.get('CLUSTER', 'NODE_TYPE')


DB_NAME                = config.get('DATABASE', 'DB_NAME')
DB_USER                = config.get('DATABASE', 'DB_USER')
DB_PASSWORD            = config.get('DATABASE', 'DB_PASSWORD')
DB_PORT                = config.get('DATABASE', 'DB_PORT')

IAM_ROLE_NAME          = config.get('IAM_ROLE', 'IAM_ROLE_NAME')

### Client for IAM, EC2,S3, and Redshift

In [None]:
ec2      = boto3.resource('ec2',
                          region_name           = 'us-west-2',
                          aws_access_key_id     = KEY,
                          aws_secret_access_key = SECRET)

s3       = boto3.resource('s3',
                          region_name           ='us-west-2',
                          aws_access_key_id     = KEY,
                          aws_secret_access_key = SECRET)

iam      = boto3.client('iam', 
                        region_name             ='us-west-2',
                        aws_access_key_id       = KEY,
                        aws_secret_access_key   = SECRET)

redshift = boto3.client('redshift',
                        region_name             ='us-west-2',
                        aws_access_key_id       = KEY,
                        aws_secret_access_key   = SECRET)

### IAM ROLE
##### Create an IAM Role that enables RedShift to access S3 bucket ( ReadOnly) 

In [None]:
from botocore.exceptions import ClientError

#1.1 Create the role
try:
    print("1.1 Creating a new IAM Role called Omaar-IAM-FOR-AIRFLOW") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole','Effect': 'Allow','Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    

print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=IAM_ROLE_NAME,PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=IAM_ROLE_NAME)['Role']['Arn']


print(roleArn)

### Create Redshift Cluster


In [None]:
try:
    response= redshift.create_cluster( ClusterType   = CLUSTER_TYPE,
                                  NodeType           = NODE_TYPE,
                                  NumberOfNodes      = int(NUM_NODES),
                                  
                                  #identifiers & credentials
                                  DBName             = DB_NAME,
                                  ClusterIdentifier  = CLUSTER_IDENTIFIER,
                                  MasterUsername     = DB_USER,
                                  MasterUserPassword = DB_PASSWORD,
                                  
                                  #Roles ( for s3 access)
                                  IamRoles = [roleArn] 
                                 )
except Exception as e:
    print(e)

### Describe the cluster to see its status 

In [None]:
def description (props):
    pd.set_option('display.max_colwidth',None)
    keyToShow= ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k,v) for k,v in props.items() if k in keyToShow]
    return pd.DataFrame(data=x,columns=['Key','Value'])
myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]
description( myClusterProps)

In [None]:
ENDPOINT = myClusterProps['Endpoint']['Address']
ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

### Connect to redshift once the dag is complete 

In [None]:
# this one is functional ( les parametre dans cfg sont dans l'order pour CLUSTER ) 
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(ENDPOINT, *config['DATABASE'].values()))
cur=conn.cursor()

### quality check & analysis 

### Clean up AWS resources 

In [None]:
redshift.delete_cluster(ClusterIdentifier= CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot= True)

In [None]:
myClusterProps2=redshift.describe_clusters(ClusterIdentifier= CLUSTER_IDENTIFIER)['Clusters'][0]
description( myClusterProps2)