# Iac: Create S3 Bucket and Launch Redshift Cluster

In [None]:
import pandas as pd
import boto3
import configparser
import json
import re
import os
import time

## AWS Configuration Variables

In [None]:
config = configparser.ConfigParser()
config.read_file(open('airflow/config/aws.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')
BUCKET                 = config.get('AWS','BUCKET')
REGION                 = config.get('AWS', 'REGION')

DWH_CLUSTER_TYPE       = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

# Instaniate AWS Resources

In [None]:
ec2 = boto3.resource('ec2',
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name=REGION
                    )

s3 = boto3.resource('s3',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET,
                    region_name=REGION
                   )

iam = boto3.client('iam',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET,
                   region_name=REGION
                  )

redshift = boto3.client('redshift',
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET,
                        region_name=REGION
                       )

## Create AWS S3 Sample Bucket

In [None]:
# create s3 bucket
try:
    s3.create_bucket(Bucket=BUCKET, CreateBucketConfiguration={'LocationConstraint': REGION})
except Exception as e:
    print('Bucket Already Exists')

# local path to sample_data
local_path = 'data/'

#for file in local_path, add to s3 bucket
file_count = 0
for root,dirs,files in os.walk(local_path):
    for file in files:
        local_file_path = os.path.join(root,file)
        bucket_file_path = os.path.join(root.replace(local_path,''),file)
        s3.Object(BUCKET, bucket_file_path).put(Body=open(local_file_path, 'rb'))

        file_count += 1
        if file_count % 10 == 0:
            print('Files Uploaded: {}'.format(file_count))

In [None]:
# list files within s3 bucket
bucket = s3.Bucket(BUCKET)
for key in bucket.objects.all():
    print(key.key)

# Create Role 

Before creating the role, make sure the AWS user defined in the aws.cfg has permission to create roles and attach policies or has administrative access. For the sake of simplicity, using a user with administrative access would be ideal. The cell below is for reference given a scenario where explicit role policies are needed to be given to specific user(s). The admin user would have to enter their key and secret values in the cell below. The cell will overide the admin variable with an empty value after the policies are attached.

In [None]:
admin = boto3.client('iam',
                   aws_access_key_id='',
                   aws_secret_access_key='',
                   region_name=REGION
                  )
                  
user_role_policy = json.dumps(
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:AttachRolePolicy",
                "iam:CreateRole",
                "iam:PutRolePolicy",
                "iam:GetRole",
                "iam:DetachRolePolicy",
                "iam:PassRole"
            ],
             "Resource": f"arn:aws:s3:::{BUCKET}"
        }
    ]
})

try:
    user_role_arn = admin.create_policy(
    PolicyName='RolePolicy',
    Path='/',
    PolicyDocument=user_role_policy,
    Description='Allows user to manage roles'
    )
except Exception as e:
    print(e)

response = admin.attach_user_policy(
    UserName=DWH_DB_USER,
    PolicyArn=user_role_arn['Policy']['Arn']
)

user_policy_list = ['arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess', 'arn:aws:iam::aws:policy/AmazonRedshiftQueryEditor', 'arn:aws:iam::aws:policy/AmazonRedshiftFullAccess']
for policy in user_policy_list:
    admin.attach_user_policy(
        UserName=DWH_DB_USER, 
        PolicyArn=policy
    )

# overite admin variable for security purposes
admin = None

In [None]:
role_trust_policy = json.dumps(
{'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               "Action": "sts:AssumeRole",
               'Principal': {'Service': 'redshift.amazonaws.com', "AWS": f"arn:aws:iam::501460770806:user/{DWH_DB_USER}"}}],
               'Version': '2012-10-17'}
)

In [None]:
role_trust_policy = json.dumps(
{'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               "Action": "sts:AssumeRole",
               'Principal': {'Service': 'redshift.amazonaws.com', "AWS": f"arn:aws:iam::501460770806:user/{DWH_DB_USER}"}}],
               'Version': '2012-10-17'}
)

# create IAM role
print("Creating Role")
try:
    role = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=role_trust_policy
    )    
except Exception as e:
    print(e)

print("Attaching Policy")

response = iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']


## Create or Resume Cluster

In [None]:
try:
    response = redshift.create_cluster(    
        #Redshift cluster config    
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [None]:
# resume cluster
try:
    redshift.resume_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)
except Exception as e:
    print(e)

In [None]:
def prettyRedshiftProps(props):
    """Returns redshift cluster properties
    Keyword Argument:
    props -- Cluster property dictionary  (redshift variable called with describe_clusters attribute)
    """
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
# prettyRedshiftProps(myClusterProps)

In [None]:
# Cell will print out when cluster is available
cluster_status = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]['ClusterStatus']
while cluster_status != 'available':
    time.sleep(60)
    cluster_status = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]['ClusterStatus']
else:
    print('Cluster is Available')

In [None]:
# RUN CELL ONLY WHEN CLUSTER IS AVAILABLE
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

In [None]:
# add cluster endpoint to aws.cfg for airflow connection setup (connections.sh)
with open('airflow/config/aws.cfg', 'a') as cfg:
    cfg.write('DWH_HOST=' + DWH_ENDPOINT + '\n')
    cfg.write('DWH_ROLE_ARN=' + DWH_ROLE_ARN + '\n')

## Allow Inbound TCP port to Access Redshift Endpoint

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        # GroupName='default',
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

## Create Postgres Database for Airflow Backend

Task parrelization isn't available with the default sqlite backend and sequential executor. A postgres database for the airflow backend will allow the local executor to be used for task parrelization.

Instructions:
- [Download Postgres UI App](https://www.postgresql.org/download/)
- Within Postgres query editor or psql terminal, run: CREATE DATABASE database_name;
- If you created the database with another user other than the default postgres user, add username to POSTGRES_USER below


In [None]:
#uncomment cell below to install sql magic
# ! pip install ipython-sql
%load_ext sql

In [None]:
POSTGRES_USER = 
POSTGRES_PASSWORD = 
POSTGRES_HOST = 'postgres'
POSTGRES_PORT = '5432'
POSTGRES_DB = 

postgres_conn_string = "postgresql://{}:{}@{}:{}/{}".format(POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB)

config = configparser.ConfigParser(allow_no_value=True)
config.read_file(open('airflow/config/airflow.cfg'))

#update to postgres string
config.set('core','sql_alchemy_conn', postgres_conn_string)
#write changes to config file
with open('airflow/config/airflow.cfg', 'w') as configfile:
    config.write(configfile)
    configfile.close()

In [None]:
postgres_conn_string

In [None]:
%sql $postgres_conn_string

## Connect to Redshift Relational Database

In [None]:
redshift_conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
%sql $redshift_conn_string

## Fix STL Error Table

In the process of running the data pipeline, the staging tasks may raise stl load errors. The DAG handles these errors by transfering error rows to separate error table for each staging table that raises stl load errors. The staging tasks will still continue to load non-error rows to the designated tables, although the downstream staging_success_check tasks will fail to prevent the fact tables to be created with data integrity issues. 

Use the template code below to: fix any stl errors within the error tables, insert the fixed error tables into the related staging tables, and lastly drop the error table from the redshift data warehouse. After the error table is dropped, head back to the Airflow UI and re-run the staging_success_check task associated with the fixed error table. Re-running the staging_success_check will label the task as success to allow the downstream fact table tasks to be executed. After re-running the staging_success_check task, if the task is still labeled as failed, use the cells below to make sure the error table is dropped from Redshift.

In [None]:
%sql SELECT * FROM stl_load_errors LIMIT 5

In [None]:
# use for debugging error table
%sql SELECT * FROM data_science_video_log_errors LIMIT 5

In [None]:
error_table = 'data_science_video_log_errors'
table = 'data_science_video_log'

update_sql = f"""UPDATE {error_table}
SET 
"""

insert_sql = f"""
INSERT INTO 
    {table}
SELECT 
/*  cast error columns to staging table data type */
FROM 
    {error_table}
"""

drop_sql = f"DROP TABLE {error_table}"

In [None]:
print(f'Fixing rows within {error_table}')
%sql $update_sql

In [None]:
print(f'Inserting fixed rows into {table}')
%sql $insert_sql


In [None]:
print(f'Dropping {error_table}')
%sql $drop_sql

# Table Samples

## Dimension Tables

In [None]:
%sql SELECT * FROM projects_dim LIMIT 10

In [None]:
%sql SELECT * FROM users_dim LIMIT 10

In [None]:
%sql SELECT * FROM videos_dim LIMIT 10

# Staging Tables

In [None]:
%sql SELECT * FROM data_science_project_feedback WHERE EXTRACT(MONTH FROM submit_date) = 2 LIMIT 10

In [None]:
%sql SELECT * FROM data_science_section_feedback LIMIT 10

In [None]:
%sql SELECT * FROM data_science_video_log LIMIT 10

In [None]:
%sql SELECT * FROM data_science_mentor_activity LIMIT 10

## Fact Tables

In [None]:
%sql SELECT * FROM data_science_highest_prompt_score LIMIT 10

In [None]:
%sql SELECT * FROM data_science_highest_answer_score LIMIT 10

In [None]:
%sql SELECT * FROM data_science_avg_video_views_per_user LIMIT 10

In [None]:
%sql select * FROM data_science_avg_video_view_range LIMIT 10

# Delete or Pause Resources

For PostgreSQL connection, shut down server throught PostgreSQL app or psql terminal.

In [None]:
# pause cluster
try:
    pause_cluster = redshift.pause_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)
except Exception as e:
    print(e)

In [None]:
#delete cluster
# response = redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)[prettyRedshiftProps(myClusterProps)['Key'] == 'ClusterStatus']['Value']

In [None]:
# delete IAM role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)