# Iac: Create S3 Bucket and Launch Redshift Cluster

In [None]:
import pandas as pd
import boto3
import configparser
import json
import re
import os
import time
from io import StringIO

## AWS Configuration Variables

In [None]:
config = configparser.ConfigParser()
config.read_file(open('airflow/config/aws.cfg'))

KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')
BUCKET = config.get('AWS','BUCKET')
REGION = config.get('AWS', 'REGION')
CRAWLER = config.get('AWS', 'CRAWLER')

DL_DB_USER = config.get('DL', 'DL_DB_USER')
DL_DB_PASSWORD = config.get('DL', 'DL_DB_PASSWORD')
DL_DB = config.get('DL', 'DL_DB')
DL_IAM_ROLE_NAME = config.get('DL', 'DL_IAM_ROLE_NAME')

pd.DataFrame({"Param":
                  ["DL_DB", "DL_DB_USER", "DL_DB_PASSWORD", "DL_IAM_ROLE_NAME"],
              "Value":
                  [DL_DB, DL_DB_USER, DL_DB_PASSWORD, DL_IAM_ROLE_NAME]
             })

# Instaniate AWS Resources

In [None]:
s3 = boto3.resource('s3',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET,
                    region_name=REGION)

iam = boto3.client('iam',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET,
                   region_name=REGION)

glue = boto3.client(
    service_name='glue',
    aws_access_key_id=config.get('AWS', 'KEY'),
    aws_secret_access_key=config.get('AWS', 'SECRET'),
    region_name=config.get('AWS', 'REGION'))

## Create AWS Glue Crawler

## Fix Data Type issues in staging tables

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# get glue inferred schema
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()

## Create AWS S3 Sample Bucket

In [None]:
#unzip data.zip to create sample_data/ (35 MB)
! unzip data.zip

In [None]:
# create s3 bucket
try:
    s3.create_bucket(Bucket=BUCKET, CreateBucketConfiguration={'LocationConstraint': REGION})
except Exception as e:
    print(e)

# local path to sample_data
local_path = 'data/'

#for file in local_path, add to s3 bucket
file_count = 0
print_every = 100
for root,dirs,files in os.walk(local_path):
    for file in files:
        if file == '.DS_Store':
            continue
        else:
            pass
        local_file_path = os.path.join(root,file)
        bucket_file_path = os.path.join(root.replace(local_path,'',1),file)
        s3.Object(BUCKET, bucket_file_path).put(Body=open(local_file_path, 'rb'))
        
        file_count += 1
        if file_count % print_every == 0:
            print('Files Uploaded: {}'.format(file_count))

# Create and Attach IAM Role

Before creating the role, make sure the AWS user defined in the aws.cfg has permission to create roles and attach policies or has administrative access. For the sake of simplicity, using a user with administrative access would be ideal. The cell below is for reference given a scenario where explicit role policies are needed to be given to specific user(s). The admin user would have to enter their key and secret values in the cell below. The cell will overide the admin variable with an empty value after the policies are attached.

In [None]:
admin = boto3.client('iam',
                   aws_access_key_id='',
                   aws_secret_access_key='',
                   region_name=REGION
                  )
                  
user_role_policy = json.dumps(
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:AttachRolePolicy",
                "iam:CreateRole",
                "iam:PutRolePolicy",
                "iam:GetRole",
                "iam:DetachRolePolicy",
                "iam:PassRole"
            ],
             "Resource": f"arn:aws:s3:::{BUCKET}"
        }
    ]
})

# create user policy
try:
    user_role_arn = admin.create_policy(
    PolicyName='RolePolicy',
    Path='/',
    PolicyDocument=user_role_policy,
    Description='Allows user to manage roles')
except Exception as e:
    print(e)

#attach user policy to user
try:
    response = admin.attach_user_policy(
    UserName=DL_DB_USER,
    PolicyArn=user_role_arn['Policy']['Arn'])
except Exception as e:
    print(e)

# overite admin variable for security purposes
admin = None

In [None]:
crawler_s3_access_policy = json.dumps(
    {
        "Version": "2012-10-17",
        "Statement": 
        [
            {
                "Effect": "Allow",
                "Action": 
                [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": 
                    [
                    f"arn:aws:s3:::{BUCKET}*"
                    ]
            }
        ]
    }
)
    
role_trust_policy = json.dumps(
    {
        "Version": "2012-10-17",
        "Statement": [
            {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
            }
        ]
    }
)

# create IAM role
print("Creating Role")
try:
    role = iam.create_role(
        Path='/',
        RoleName=DL_IAM_ROLE_NAME,
        Description = "Allows Glue Crawler to call AWS services on your behalf.",
        AssumeRolePolicyDocument=role_trust_policy
    )    
except Exception as e:
    print(e)

print("Attaching Policy")
try:
    sparkify_policy = iam.create_policy(
        PolicyName='AWSGlue-Sparkify',
        Path='/',
        PolicyDocument=crawler_s3_access_policy,
        Description='Allows user to access sparkify bucket')
except Exception as e:
    print(e)

response = iam.attach_role_policy(
    RoleName=DL_IAM_ROLE_NAME,
    PolicyArn='arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')

response = iam.attach_role_policy(
    RoleName=DL_IAM_ROLE_NAME,
    PolicyArn=sparkify_policy['Policy']['Arn'])


## Create Crawler

In [None]:
try:
    crawler = glue.get_crawler(Name=CRAWLER)
# except EntityNotFoundException as e:
except Exception as e:
    print(f'"{CRAWLER}" crawler is not found')
    print(f'Creating "{CRAWLER}" crawler')
    response = glue.create_crawler(
        Name=CRAWLER,
        Role=DL_IAM_ROLE_NAME,
        DatabaseName='sparkify',
        Description="Crawler for generated schema",
        Targets={
            'S3Targets': [
                {
                    'Path': f's3://{BUCKET}/song_data',
                    'Exclusions': []
                },
            ]
        },
        SchemaChangePolicy={
            'UpdateBehavior': 'UPDATE_IN_DATABASE',
            'DeleteBehavior': 'DELETE_FROM_DATABASE'
        }
        #,Configuration='{ "Version": 1.0, "CrawlerOutput": { "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" } } }'
    )

    crawler = glue.get_crawler(Name=CRAWLER)


In [None]:
glue.start_crawler(Name=CRAWLER)

crawler = glue.get_crawler(Name=CRAWLER)
crawler_state = crawler['Crawler']['State']
sleep_secs = 100

while crawler_state != 'READY':
    time.sleep(sleep_secs)
    metrics = glue.get_crawler_metrics(CrawlerNameList=[CRAWLER])['CrawlerMetricsList'][0]
    if metrics['StillEstimating'] == True:
        pass
    else:
        time_left = int(metrics['TimeLeftSeconds'])
        if time_left > 0:
            print('Estimated Time Left: ', time_left)
            sleep_secs = time_left
        else:
            print('Crawler should finish soon')
            crawler = glue.get_crawler(Name=CRAWLER)
    # refresh crawler state
    crawler = glue.get_crawler(Name=CRAWLER)
    crawler_state = crawler['Crawler']['State']

metrics = glue.get_crawler_metrics(CrawlerNameList=['sparkify_crawler'])['CrawlerMetricsList'][0]
print('\nTable Metrics')
print('Number of Tables Created: ', metrics['TablesCreated'])
print('Number of Tables Updated: ', metrics['TablesUpdated'])
print('Number of Tables Deleted: ', metrics['TablesDeleted'])

print('Crawler is done')

## Create Postgres Database for Airflow Backend

Task parrelization isn't available with the default sqlite backend and sequential executor. A postgres database for the airflow backend will allow the local executor to be used for task parrelization.

Instructions:
- [Download Postgres UI App](https://www.postgresql.org/download/)
- Within Postgres query editor or psql terminal, run: CREATE DATABASE database_name;
- If you created the database with another user other than the default postgres user, add username to POSTGRES_USER below


In [None]:
#uncomment cell below to install sql magic
# ! pip install ipython-sql
%load_ext sql

In [None]:
POSTGRES_USER = 
POSTGRES_PASSWORD = 
POSTGRES_HOST = 'postgres'
POSTGRES_PORT = '5432'
POSTGRES_DB = 

postgres_conn_string = "postgresql://{}:{}@{}:{}/{}".format(POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB)

config = configparser.ConfigParser(allow_no_value=True)
config.read_file(open('airflow/config/airflow.cfg'))

#update to postgres string
config.set('core','sql_alchemy_conn', postgres_conn_string)
#write changes to config file
with open('airflow/config/airflow.cfg', 'w') as configfile:
    config.write(configfile)
    configfile.close()

# Table Samples

## Dimension Tables

## Staging Tables

## Fact Tables