## Infrastructure-as-code set-up using AWS's SDK for Python

In [1]:
# load libraries
import pandas as pd
import boto3
import botocore
import json
import configparser
import sql

### Configuration file

In [2]:
# load DWH paramters from a local file
config = configparser.ConfigParser()
config.read_file(open('airflow.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')
REGION                 = config.get('AWS','REGION')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES",
                   "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER",
                    "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD",
                    "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES,
                   DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, 
                   DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, 
                   DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,airflow
5,DWH_DB_USER,airflow-user
6,DWH_DB_PASSWORD,PAssword1234
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,neidhpa-airflow-project


### AWS Clients

In [3]:
# create clients for IAM, EC2, S3 and Redshift
ec2 = boto3.resource('ec2',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.client('s3',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name=REGION
                  )

redshift = boto3.client('redshift',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

### IAM Role

In [4]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        print("Role already exists.")
    else:
        raise e
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonRedshiftFullAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
Role already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::302422927713:role/neidhpa-airflow-project


#### Create Redshift cluster

In [5]:

try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )

    # Wait for cluster creation to complete
    redshift.get_waiter('cluster_available').wait(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)
    
    # Retrieve the endpoint of the cluster
    cluster_props = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    DWH_ENDPOINT = cluster_props['Endpoint']['Address']
    
    print("Cluster created successfully!")
    print("DWH_ENDPOINT:", DWH_ENDPOINT)

except Exception as e:
    print(e)

Cluster created successfully!
DWH_ENDPOINT: dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com


In [6]:
def RedshiftProps(props):
    pd.set_option('display.max_colwidth', None)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus",
                "MasterUsername", "DBName", "Endpoint",
                "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters\
    (ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
RedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,airflow-user
4,DBName,airflow
5,Endpoint,"{'Address': 'dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0478d8d5f8fd4539b
7,NumberOfNodes,4


In [7]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::302422927713:role/neidhpa-airflow-project


In [8]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    # Check if the rule already exists before adding
    existing_rules = defaultSg.ip_permissions
    print("Existing rules:")
    for rule in existing_rules:
        print(rule)
        if (
            rule['IpProtocol'].lower() == 'tcp' and
            rule['FromPort'] == int(DWH_PORT) and
            rule['ToPort'] == int(DWH_PORT) and
            any(x['CidrIp'] == '0.0.0.0/0' for x in rule['IpRanges'])
        ):
            print("Rule already exists")
            break
    else:
        defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT)
        )
        
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-05ad93b78bd502d9e')
Existing rules:
{'FromPort': 5439, 'IpProtocol': 'tcp', 'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'Allow traffic from anywhere in the world'}], 'Ipv6Ranges': [], 'PrefixListIds': [], 'ToPort': 5439, 'UserIdGroupPairs': []}
Rule already exists


In [9]:
%load_ext sql

In [10]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://airflow-user:PAssword1234@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow


'Connected: airflow-user@airflow'

### Create Redshift tables

In [16]:
create_staging_events_table = """
    DROP TABLE IF EXISTS staging_events;
    CREATE TABLE IF NOT EXISTS staging_events (
        artist VARCHAR,
        auth VARCHAR,
        firstName VARCHAR,
        gender VARCHAR,
        itemInSession INTEGER,
        lastName VARCHAR,
        length FLOAT,
        level VARCHAR,
        location VARCHAR,
        method VARCHAR,
        page VARCHAR,
        registration FLOAT,
        sessionId INTEGER,
        song VARCHAR,
        status INTEGER,
        ts BIGINT DISTKEY,
        userAgent VARCHAR,
        userId INTEGER
    );
    """

create_staging_songs_table = """
    DROP TABLE IF EXISTS staging_songs;
    CREATE TABLE IF NOT EXISTS staging_songs (
        num_songs INTEGER,
        artist_id VARCHAR(18) DISTKEY,
        artist_latitude FLOAT,
        artist_longitude FLOAT,
        artist_location VARCHAR,
        artist_name VARCHAR,
        song_id VARCHAR(18),
        title VARCHAR,
        duration FLOAT,
        year INTEGER
    );
    """

create_songplays_table = """
    DROP TABLE IF EXISTS songplays;
    CREATE TABLE IF NOT EXISTS songplays (
        songplay_id INTEGER IDENTITY(0,1) PRIMARY KEY,
        start_time TIMESTAMP NOT NULL,
        user_id INTEGER NOT NULL,
        level VARCHAR,
        song_id VARCHAR NOT NULL,
        artist_id VARCHAR NOT NULL,
        session_id INTEGER,
        location VARCHAR,
        user_agent TEXT
    );
    """

create_users_table = """
    DROP TABLE IF EXISTS users;
    CREATE TABLE IF NOT EXISTS users (
        user_id INTEGER PRIMARY KEY DISTKEY,
        first_name VARCHAR,
        last_name VARCHAR,
        gender CHAR(1),
        level VARCHAR(5)
    );
    """

create_songs_table = """
    DROP TABLE IF EXISTS songs;
    CREATE TABLE IF NOT EXISTS songs (
        song_id VARCHAR PRIMARY KEY,
        title VARCHAR NOT NULL,
        artist_id VARCHAR NOT NULL DISTKEY,
        year INTEGER,
        duration FLOAT
    );
    """

create_artists_table = """
    DROP TABLE IF EXISTS artists;
    CREATE TABLE IF NOT EXISTS artists (
        artist_id VARCHAR PRIMARY KEY DISTKEY,
        name VARCHAR NOT NULL,
        location VARCHAR,
        latitude FLOAT,
        longitude FLOAT
    );
    """

create_time_table = """
    DROP TABLE IF EXISTS time;
    CREATE TABLE IF NOT EXISTS time (
        start_time TIMESTAMP PRIMARY KEY DISTKEY,
        hour INTEGER,
        day INTEGER,
        week INTEGER,
        month INTEGER,
        year INTEGER,
        weekday INTEGER
    );
    """

In [17]:
%sql $create_staging_events_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]

In [18]:
%sql $create_staging_songs_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]

In [19]:
%sql $create_songplays_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]

In [20]:
%sql $create_users_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]

In [21]:
%sql $create_songs_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]

In [22]:
%sql $create_artists_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]

In [23]:
%sql $create_time_table

 * postgresql://airflow-user:***@dwhcluster.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/airflow
Done.
Done.


[]