# Introduction

This notebook exemplifies the creation of a simple cloud data warehouse using AWS Redshift. An ETL process will define all queries needed to create a star schema for a Redshift database containing fact and dimension tables optimized for order analytics. The pipeline will use an Infrastructure as Code (IaC) approach to create, manage and tear down our AWS resources. A simple dashboard will demonstrate how similar workflows can support Online analytical processing (OLAP).

Tools:
- Pandas
- Spark
- S3
- Redshift

# Data Exploration

### Imports

In [19]:
import os
import glob

import matplotlib
import pandas as pd
from pathlib import Path

Data for this project exists as .csv files within the "source_data" folder. So, before defining a database schema for analysis, we'll do some data exploration using pandas. The following cell prints of list of files within our source data directory.

Our data is spread across six different files which include: 
- products.csv
- orders.csv
- order_products_train.csv
- order_products_prior.csv
- departments.csv
- aisles.csv

Next, we'll load each file as a dataframe, look at unique values, and review basic descriptive stats. 


In [57]:
#### Get a list of file paths 

filepath = os.getcwd() + '/source_data'
for root, dirs, files in os.walk(filepath):
    for i in glob.glob(os.path.join(root,'*')):
        if i.endswith('.csv'):
            print(i)
#     print(file_path_list)
#     file_path_list = glob.glob(os.path.join(root,'*'))
#     print(file_path_list)
#     for fname in files:
#         print(fname.replace(".csv", "" ))

/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/products/products.csv
/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/departments/departments.csv
/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/aisles/aisles.csv
/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/order_products/order_products__train.csv
/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/order_products/order_products__prior.csv
/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/orders/orders.csv


### Products

In [36]:
products = pd.read_csv('/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/products/products.csv')
products

Unnamed: 0,product_id,product_name,aisle_id,department_id
0,1,Chocolate Sandwich Cookies,61,19
1,2,All-Seasons Salt,104,13
2,3,Robust Golden Unsweetened Oolong Tea,94,7
3,4,Smart Ones Classic Favorites Mini Rigatoni Wit...,38,1
4,5,Green Chile Anytime Sauce,5,13
...,...,...,...,...
49683,49684,"Vodka, Triple Distilled, Twist of Vanilla",124,5
49684,49685,En Croute Roast Hazelnut Cranberry,42,1
49685,49686,Artisan Baguette,112,3
49686,49687,Smartblend Healthy Metabolism Dry Cat Food,41,8


In [36]:
products.nunique()

product_id       49688
product_name     49688
aisle_id           134
department_id       21
dtype: int64

In [37]:
products.describe()

Unnamed: 0,product_id,aisle_id,department_id
count,49688.0,49688.0,49688.0
mean,24844.5,67.769582,11.728687
std,14343.834425,38.316162,5.85041
min,1.0,1.0,1.0
25%,12422.75,35.0,7.0
50%,24844.5,69.0,13.0
75%,37266.25,100.0,17.0
max,49688.0,134.0,21.0


In [38]:
products.isna().sum()

product_id       0
product_name     0
aisle_id         0
department_id    0
dtype: int64

### Departments

In [37]:
departments = pd.read_csv('/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/departments/departments.csv')
departments.nunique()

department_id    21
department       21
dtype: int64

In [9]:
departments

Unnamed: 0,department_id,department
0,1,frozen
1,2,other
2,3,bakery
3,4,produce
4,5,alcohol
5,6,international
6,7,beverages
7,8,pets
8,9,dry goods pasta
9,10,bulk


In [39]:
departments.describe()

Unnamed: 0,department_id
count,21.0
mean,11.0
std,6.204837
min,1.0
25%,6.0
50%,11.0
75%,16.0
max,21.0


In [41]:
departments.isna().sum()

department_id    0
department       0
dtype: int64

### Aisles

In [64]:
aisles = pd.read_csv('/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/aisles/aisles.csv')
aisles.nunique()

aisle_id    134
aisle       134
dtype: int64

In [65]:
aisles

Unnamed: 0,aisle_id,aisle
0,1,prepared soups salads
1,2,specialty cheeses
2,3,energy granola bars
3,4,instant foods
4,5,marinades meat preparation
...,...,...
129,130,hot cereal pancake mixes
130,131,dry pasta
131,132,beauty
132,133,muscles joints pain relief


In [42]:
aisles.describe()

Unnamed: 0,aisle_id
count,134.0
mean,67.5
std,38.826537
min,1.0
25%,34.25
50%,67.5
75%,100.75
max,134.0


In [43]:
aisles.isna().sum()

aisle_id    0
aisle       0
dtype: int64

## Using Spark for Large File Data Exploration

Our orders data is somewhat large for pandas, and the orders_product data is split across files. I could use something like Dask to concatenate and work on these files. However, in this scenario I'll use Spark to explore the files and review some basic stats since we'll be using it again in later steps.

In [40]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

sc = pyspark.SparkContext(appName="view_orders")
spark = SparkSession.builder.appName("testing").getOrCreate()

# spark.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/09 19:32:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Orders

In [41]:
orders = spark.read.option("header",True)\
    .csv('/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/orders/orders.csv')

                                                                                

In [37]:
orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- order_dow: string (nullable = true)
 |-- order_hour_of_day: string (nullable = true)
 |-- days_since_prior_order: string (nullable = true)



In [38]:
orders.describe().show()



+-------+-----------------+------------------+--------+------------------+------------------+------------------+----------------------+
|summary|         order_id|           user_id|eval_set|      order_number|         order_dow| order_hour_of_day|days_since_prior_order|
+-------+-----------------+------------------+--------+------------------+------------------+------------------+----------------------+
|  count|          3421083|           3421083| 3421083|           3421083|           3421083|           3421083|               3214874|
|   mean|        1710542.0|102978.20805926077|    null|17.154857979183785|2.7762191095626734| 13.45201534134074|    11.114836226863012|
| stddev|987581.7398225801| 59533.71779350224|    null|17.733164470966674| 2.046829193987996|4.2260884021020235|       9.2067365175338|
|    min|                1|                 1|   prior|                 1|                 0|                00|                   0.0|
|    max|           999999|             99999|  

                                                                                

The 'eval_set' column is used for machine learning purposes, let's drop it for this project.

## order_products_*

In [42]:
paths = ['/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/order_products/order_products__prior.csv',
        '/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/order_products/order_products__train.csv']

order_products = spark.read.options(header=True).csv(paths)
    

In [43]:
order_products.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- add_to_cart_order: string (nullable = true)
 |-- reordered: string (nullable = true)



In [44]:
order_products.describe().show()

                                                                                

+-------+-----------------+------------------+------------------+-------------------+
|summary|         order_id|        product_id| add_to_cart_order|          reordered|
+-------+-----------------+------------------+------------------+-------------------+
|  count|         33819106|          33819106|          33819106|           33819106|
|   mean|1710566.290919606|25575.514530129803| 8.367737574139305| 0.5900617242809434|
| stddev|987400.7619328625|14097.696773864618|7.1395401151131015|0.49182201350891674|
|    min|                1|                 1|                 1|                  0|
|    max|           999999|              9999|                99|                  1|
+-------+-----------------+------------------+------------------+-------------------+



How many distinct products were ordered? 

In [76]:
# from pyspark.sql.functions import countDistinct

order_products2 = order_products.select(countDistinct("product_id"))
order_products2.show()



+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                     49685|
+--------------------------+



                                                                                

How many orders had product_id '49302' placed in the cart first? 

In [110]:
#spark sql 

order_products.createOrReplaceTempView("order_in_cart")

spark.sql('''
    SELECT COUNT(order_id)
    FROM order_in_cart
    WHERE product_id = '49302'
    AND add_to_cart_order = '1'

''').show()



+---------------+
|count(order_id)|
+---------------+
|             28|
+---------------+



                                                                                

Select all order_ids where product_id 49302 was put in the cart first.

In [129]:
#df filtering 

order_products.select(['order_id','product_id','add_to_cart_order']).filter((order_products.product_id=='49302') &
                 (order_products.add_to_cart_order=='1')).show(100)




+--------+----------+-----------------+
|order_id|product_id|add_to_cart_order|
+--------+----------+-----------------+
|  262316|     49302|                1|
|  269385|     49302|                1|
|  312335|     49302|                1|
|  316883|     49302|                1|
|  584879|     49302|                1|
|  683097|     49302|                1|
|  691172|     49302|                1|
|  809060|     49302|                1|
| 1111504|     49302|                1|
| 1140876|     49302|                1|
| 1666866|     49302|                1|
| 1714323|     49302|                1|
| 2172185|     49302|                1|
| 2355248|     49302|                1|
| 2470870|     49302|                1|
| 2494999|     49302|                1|
| 2541372|     49302|                1|
| 2553776|     49302|                1|
| 2578428|     49302|                1|
| 2787832|     49302|                1|
| 2851180|     49302|                1|
| 2861653|     49302|                1|




I can easily write the newly concatenated dataframe to a file of my choosen format with: 
```
dfname.write.save(out_path, format="format_name", header=True)
```

This will results in file parts that can be easily used in downstream processes.

In [171]:
order_products.write.save('/Users/margiehenry/Projects/cloud_dw_from_csv/source_data/order_products_all', format="csv", header=True)


                                                                                

## Investigating the relationship between departments and aisles.

Can a given aisle house products for many departments? To answer this question we will perform a join on the products, departments, and aisles dataframes and create a pivot table to visualize our results. 

In [12]:
product_detailed = pd.merge(pd.merge(products,departments,on='department_id'),aisles,on='aisle_id')
product_detailed

Unnamed: 0,product_id,product_name,aisle_id,department_id,department,aisle
0,1,Chocolate Sandwich Cookies,61,19,snacks,cookies cakes
1,78,Nutter Butter Cookie Bites Go-Pak,61,19,snacks,cookies cakes
2,102,Danish Butter Cookies,61,19,snacks,cookies cakes
3,172,Gluten Free All Natural Chocolate Chip Cookies,61,19,snacks,cookies cakes
4,285,Mini Nilla Wafers Munch Pack,61,19,snacks,cookies cakes
...,...,...,...,...,...,...
49683,22827,Organic Black Mission Figs,18,10,bulk,bulk dried fruits vegetables
49684,28655,Crystallized Ginger Chunks,18,10,bulk,bulk dried fruits vegetables
49685,30365,Vegetable Chips,18,10,bulk,bulk dried fruits vegetables
49686,38007,Naturally Sweet Plantain Chips,18,10,bulk,bulk dried fruits vegetables


Our pivot table summarizes the count of product_ids in each aisle by department. The resulting pivot tables reveals that a one-to-one relationship between "department" and "aisle" does not exists. The "babies" department, for example, is spread across the "baby accessories," "baby bath care," and "baby food formula" aisles.

In [30]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_row', None)


# product_detailed.groupby(['department_id', 'aisle_id']).agg({'product_id':'sum'})
product_detailed.pivot_table("product_id", index='aisle', columns='department')

department,alcohol,babies,bakery,beverages,breakfast,bulk,canned goods,dairy eggs,deli,dry goods pasta,frozen,household,international,meat seafood,missing,other,pantry,personal care,pets,produce,snacks
aisle,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
air fresheners candles,,,,,,,,,,,,24177.070423,,,,,,,,,
asian foods,,,,,,,,,,,,,24360.928926,,,,,,,,
baby accessories,,23793.113636,,,,,,,,,,,,,,,,,,,
baby bath body care,,23451.477273,,,,,,,,,,,,,,,,,,,
baby food formula,,25210.770195,,,,,,,,,,,,,,,,,,,
bakery desserts,,,25824.383838,,,,,,,,,,,,,,,,,,
baking ingredients,,,,,,,,,,,,,,,,,25507.847512,,,,
baking supplies decor,,,,,,,,,,,,,,,,,25280.572414,,,,
beauty,,,,,,,,,,,,,,,,,,25488.258427,,,
beers coolers,24662.241558,,,,,,,,,,,,,,,,,,,,


## Summary of Exploration
Looking at the unique value counts across our datasets we learn that our source data:
- Contains information about over 3MM (3,421,083) orders placed by around 200K (206,209) unique customers. 
- Describes nearly 50K (49,688) unique products, from 21 different departments, stored across 134 aisles.
- Provides details about items that were ordered multiple times (order_products__*.reordered), and orders placed 24/7. 
- Is limited to the first 100 orders placed by a given user. 
- Has missing values for "days_since_prior_order" in the file containing orders. This seems expected since each customer's first order should have no value for this attribute. 



There are two files containing information about product orders that can be combined into a single file: order_products__train and order_products__prior.  

In [59]:
# Save to new 'raw file' prior to S3 upload with desired folder structure

from datetime import datetime
current_dateTime = datetime.now()

#def save_df 

#def save_spark_df

#zone = raw|processed|curated

# path --> f'{key}/{zone}/{year}/{month}/{day}/{file_name}'

In [66]:

aisles.to_csv(f'/Users/margiehenry/Projects/cloud_dw_from_csv/raw/aisles/\
    {current_dateTime.year}/{current_dateTime.month}/{current_dateTime.day}/aisles.csv')

OSError: Cannot save file into a non-existent directory: '/Users/margiehenry/Projects/cloud_dw_from_csv/raw/aisles/2023/1/9'

In [63]:
current_dateTime.day

9

## Data Modelings

An entity relationship model representing (ERG) our transactional data is as follows: 

![Transactional data ERG](images/transactional_tables.png)

## Star Schema
Next, we'll design a star schema optimized for order analysis using our csv files representing transactional data.

The business process we want to model is product orders, so the count of orders will be the transactional fact at the center of our star schema. A single entry in the fact table will represent the number of orders placed, for a given product, by a specific customer during a certain shopping period.

- We will create a "dim_periods" table by serializing unique combinations of "day of the week" and "time of day." Information about transaction dates and holidays are good data points for future iterations of this table. 
- The orders dimensions table will describe who placed an order, the period of time since a customer's last order, and customer order sequencing information. 
- The products dimensions table will house product names. Adding product costs (when available) would allow our facts table to include information about total revenue generated. 
- The departments and aisles tables can remian as they are. 

The resulting star schema is: 

![Transactional data ERG](images/star_schema_main.png)

# Working with AWS



### Imports

In [1]:
from datetime import datetime

import boto3
import json
import configparser
import logging
from botocore.exceptions import ClientError

## Configure Secrets

To begin, we'll need to setup a file called "dwh.cfg", which will hold all of the secret configurations details that will be need to create our Redshift cluster. Our **dwh.cfg should not be added to github**. We can avoid revealing our secrets by added this file to our .gitignore. The dwh.cfg file should contain the following sections, identified using brackets, along with the key-value pairs representing our secrets:

```
[AWS]
AWS_KEY_ID=<your AWS role id>
AWS_SECRET_KEY=<your AWS role secret key>
DEFAULT_REGION=<your default region>
ACCOUNT=<your AWS account number>

```

Docs for configparser, which demonstrate a basic config file can be fund here: https://docs.python.org/3/library/configparser.html#quick-start

After importing the parser, and creating an instance of it, we will read our 'dwh.cfg' file and use the parser's `get()` method to access our secrets.

We'll use a config file with configparser instead of the AWS Secrets Manager.

In [13]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config.get('AWS', 'AWS_KEY_ID')
SECRET = config.get('AWS', 'AWS_SECRET_KEY')
DEFAULT_REGION = config.get('AWS', 'DEFAULT_REGION')
ACCOUNT = config.get('AWS', 'ACCOUNT')
IAM_ROLE_NAME = config.get("DWH", "IAM_ROLE_NAME")
GLUE_ROLE_NAME = config.get("DWH", "glue-role-name")
GLUE_DB_NAME = config.get("DWH", "glue-db-name")


## Use the Python SDK Boto3 to Create an S3 bucket.

With AWS access secrets configured, we can use Boto3 to create an S3 bucket. 

In [3]:
iam = boto3.client('iam', region_name=DEFAULT_REGION, 
                  aws_access_key_id=KEY, 
                  aws_secret_access_key=SECRET)

In [4]:
s3 = boto3.resource('s3', region_name=DEFAULT_REGION, 
                  aws_access_key_id=KEY, 
                  aws_secret_access_key=SECRET)

def create_bucket(bucket_name, s3_resource, region):
    try:
        if region is None:
            s3_resource.create_bucket(Bucket=bucket_name)
        else:
            s3_resource = boto3.resource('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_resource.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

bucket_name = 'productdatauseast1'
create_bucket(bucket_name, s3, None)

True

Note: Because my default region is us-east-1, I do not need to pass this constraint when creating a bucket. Doing so will result in the error `An error occurred (InvalidLocationConstraint) when calling the CreateBucket operation: The specified location-constraint is not valid`

Docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-creating-buckets.html#create-an-amazon-s3-bucket

## Create a Resource-Based Bucket Policy

We'll create a policy which ensures that our bucket is private, and only the account root user can performs S3 actions. This setup accomodates our current pipeline, but would need to be altered if we used other AWS resources upstream.


Docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-bucket-policies.html#set-a-bucket-policy

In [5]:
bucket_policy = {
    'Version': '2012-10-17',
    'Statement': [{
        'Sid': 'AddPerm',
        'Effect': 'Allow',
        'Principal': {
            "AWS": f"arn:aws:iam::{ACCOUNT}:root"
        },
        'Action': "s3:*",
        'Resource': [f"arn:aws:s3:::{bucket_name}",f"arn:aws:s3:::{bucket_name}/*"]
    }]
}

# Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)

# Set the new policy
s3 = boto3.client('s3')
s3.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)

{'ResponseMetadata': {'RequestId': 'NKN66E39HHSYJ6QN',
  'HostId': '3JgSC7Lv3aoMPJmLlGqL4fKwOC7TemkkK9jOhSGIl0KGQpdjMzCURCIewUD6j/RStIAW/k9AFQ4=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': '3JgSC7Lv3aoMPJmLlGqL4fKwOC7TemkkK9jOhSGIl0KGQpdjMzCURCIewUD6j/RStIAW/k9AFQ4=',
   'x-amz-request-id': 'NKN66E39HHSYJ6QN',
   'date': 'Mon, 09 Jan 2023 20:36:25 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

## Upload sample data from `source_data` directory.

While this step can be accomplished using a number of methods, my prefered being through the use of the boto3, we'll opt for an approach where we walk a given directory, and upload files to S3 with their given names.

In [17]:
def upload_file(file_path_list, bucket_name, s3_client):
    for file_path in file_path_list:
        try:
            filename = Path(file_path).stem
#             response = s3_client.upload_file(file_path, bucket_name, f'souredata/raw/{}/{filename}')
            logging.info(f"{filename} successfully upload to {bucket_name}")
        except ClientError as e:
            logging.error(e)
            return False
    return True
    
upload_file(file_path_list, bucket_name, s3)


NameError: name 'file_path_list' is not defined

## View the content of the S3 bucket

Note: If we were working with a large quantity of file we could pass in the `MaxKeys` parameter to limit the number of objects returned from out call to `s3.list_objects()`. 


In [83]:
s3_client = boto3.client("s3")
kwargs = {'Bucket': bucket_name}

response = s3_client.list_objects_v2(**kwargs)

for file in response.get("Contents"):
    print(file["Key"])

souredata/aisles
souredata/departments
souredata/order_products__prior
souredata/order_products__train
souredata/orders
souredata/products


## Create an EC2 instance

### Use the aws-cli to describe-vpcs api to get the VpcID that should be granted access to S3

In [198]:
!aws ec2 describe-vpcs

{
    "Vpcs": [
        {
            "CidrBlock": "172.31.0.0/16",
            "DhcpOptionsId": "dopt-02cbe92f0bd93e7f8",
            "State": "available",
            "VpcId": "vpc-0439e54e72f5467b2",
            "OwnerId": "252119340719",
            "InstanceTenancy": "default",
            "CidrBlockAssociationSet": [
                {
                    "AssociationId": "vpc-cidr-assoc-0dd3ee306a88dc8cd",
                    "CidrBlock": "172.31.0.0/16",
                    "CidrBlockState": {
                        "State": "associated"
                    }
                }
            ],
            "IsDefault": true
        }
    ]
}


### Get the RouteTableId that will be used to configure the gateway.

In [199]:
!aws ec2 describe-route-tables

{
    "RouteTables": [
        {
            "Associations": [
                {
                    "Main": true,
                    "RouteTableAssociationId": "rtbassoc-019f72fce23979684",
                    "RouteTableId": "rtb-01041395851240626",
                    "AssociationState": {
                        "State": "associated"
                    }
                }
            ],
            "PropagatingVgws": [],
            "RouteTableId": "rtb-01041395851240626",
            "Routes": [
                {
                    "DestinationCidrBlock": "172.31.0.0/16",
                    "GatewayId": "local",
                    "Origin": "CreateRouteTable",
                    "State": "active"
                },
                {
                    "DestinationCidrBlock": "0.0.0.0/0",
                    "GatewayId": "igw-07802e8797cd06177",
                    "Origin": "CreateRoute",
                    "State": "active"
                }
  

### Create the VPC gateway entrypoint

Use the EC2 VPC andRouting Tables to create an S3 Gateway Endpoint.

In [202]:
!aws ec2 create-vpc-endpoint --vpc-id vpc-0439e54e72f5467b2 \
    --service-name com.amazonaws.us-east-1.s3 --route-table-ids rtb-01041395851240626



An error occurred (RouteAlreadyExists) when calling the CreateVpcEndpoint operation: route table rtb-01041395851240626 already has a route with destination-prefix-list-id pl-63a5400a


## AWS Glue

We'll use AWS Glue to: 
Read data from S3
Run a severless ATL job that can:
Compress our CSV data line by line into parquet format
Perform SQL operations that allow us to define tables which reflect our desired star schema
Clean our data based on the data discovery performed above
Load our processed data into a staging area of S3, so that it can be easily used by AWS RedShift.

The ETL steps that we'll define to process our data can be accomplished used Glue Jobs.A seperate Glue Job will accomplish each step below: 
Define a program to crawl our raw S3 data and populate an AWS Glue Catalog with metadata
Perform data transformations

Its import to note that our csv data needs to be UTF-8 encoded for Glue to work properly.

This architecture requires the following resources and configuration:
- A routing table to store the network paths to various locations.
- A VPC Gateway that gives access to outside networks and resources like S3 which lives outside our VPC. 
- A S3 Gateway Endpoint that allows S3 traffic from your Glue Jobs into your S3 buckets.
- Buckets are storage locations within AWS, these will all exist outside of the VPC that we set up.
- An AWS Glue Data Catalog to persist table metadata


Steps will include: 
- Create a role that can access S3 and other services 
- Create an access policy for AWS Glue 
- Create a database to house our tables 
- Create tables for each data source 
- Use our S3 bucket as a data source
- Set up network access to data stores


### Create an AIM Role that has priveleges to access S3 and other services, which can be assumed by AWS Glue.

In [218]:
try:
    iam.create_role(
    Path="/",
    RoleName=GLUE_ROLE_NAME,
    AssumeRolePolicyDocument=json.dumps({
        "Version":"2012-10-17",
        "Statement":[{
            "Effect":"Allow",
            "Principal":{
                "Service":["glue.amazonaws.com"]},
            "Action":["sts:AssumeRole"]}]}),
    Description='Allow glue service to assume role to access other services')
except Exception as e:
    logging.error(e)
    print(e)
print (True)

True


### Create an Access Policy for the newly created glue role, which grants permissions to list all bucket objects and perform all S3 Object-based actions on the bucket's content.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.put_role_policy

In [223]:
try: 
    response = iam.put_role_policy(
        RoleName=GLUE_ROLE_NAME,
        PolicyName="S3Access",
        PolicyDocument=json.dumps({
            "Version": "2012-10-17",
            "Statement":[
                {
                    "Sid": "ListBucketObjects",
                    "Effect": "Allow",
                    "Action":["s3:ListBucket"],
                    "Resource": ["arn:aws:s3:::productdatauseast1"]
                },
                {
                    "Sid": "AllObjectActions",
                    "Effect": "Allow",
                    "Action":["s3:*Object"],
                    "Resource": ["arn:aws:s3:::productdatauseast1/*"]
                }
            ]
        })
    )
except Exception as e:
    logging.error(e)
    print(e)
print (True)

True


### Add another AIM Policy to the glue role using the aim put-role-policy method. This glue access policy will enable glue to perform the following tasks: 

A description of the permissions granted by this policy can be found here: https://docs.aws.amazon.com/glue/latest/dg/create-service-policy.html


In [10]:
try: 
    response = iam.put_role_policy(
        RoleName=GLUE_ROLE_NAME,
        PolicyName="S3Access",
        PolicyDocument=json.dumps({
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "glue:*",
                        "s3:GetBucketLocation",
                        "s3:ListBucket",
                        "s3:ListAllMyBuckets",
                        "s3:GetBucketAcl",
                        "ec2:DescribeVpcEndpoints",
                        "ec2:DescribeRouteTables",
                        "ec2:CreateNetworkInterface",
                        "ec2:DeleteNetworkInterface",
                        "ec2:DescribeNetworkInterfaces",
                        "ec2:DescribeSecurityGroups",
                        "ec2:DescribeSubnets",
                        "ec2:DescribeVpcAttribute",
                        "iam:ListRolePolicies",
                        "iam:GetRole",
                        "iam:GetRolePolicy",
                        "cloudwatch:PutMetricData"                
                    ],
                    "Resource": [
                        "*"
                    ]
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "s3:CreateBucket",
                        "s3:PutBucketPublicAccessBlock"
                    ],
                    "Resource": [
                        "arn:aws:s3:::aws-glue-*"
                    ]
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject",
                        "s3:PutObject",
                        "s3:DeleteObject"
                    ],
                    "Resource": [
                        "arn:aws:s3:::aws-glue-*/*",
                        "arn:aws:s3:::*/*aws-glue-*/*"
                    ]
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "s3:GetObject"
                    ],
                    "Resource": [
                        "arn:aws:s3:::crawler-public*",
                        "arn:aws:s3:::aws-glue-*"
                    ]
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents",
                        "logs:AssociateKmsKey"                
                    ],
                    "Resource": [
                        "arn:aws:logs:*:*:/aws-glue/*"
                    ]
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "ec2:CreateTags",
                        "ec2:DeleteTags"
                    ],
                    "Condition": {
                        "ForAllValues:StringEquals": {
                            "aws:TagKeys": [
                                "aws-glue-service-resource"
                            ]
                        }
                    },
                    "Resource": [
                        "arn:aws:ec2:*:*:network-interface/*",
                        "arn:aws:ec2:*:*:security-group/*",
                        "arn:aws:ec2:*:*:instance/*"
                    ]
                }
            ]
        }))
except Exception as e:
    logging.error(e)
    print(e)
print (True)

True


## Create a database to house our tables using boto3

In [12]:
glueClient = boto3.client('glue')

In [16]:
try: 
    res = glueClient.create_database(
        DatabaseInput={
            'Name': GLUE_DB_NAME 
        })
except Exception as e:
    logging.error(e)
    print(e)
print (True)

ERROR:root:An error occurred (AlreadyExistsException) when calling the CreateDatabase operation: Database already exists.


An error occurred (AlreadyExistsException) when calling the CreateDatabase operation: Database already exists.
True


## Create database tables for each data source, and associate them with our database to create and populate an AWS Glue Data Catalog. 

Table API docs: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-tables.html#aws-glue-api-catalog-tables-StorageDescriptor

Table data types: https://docs.aws.amazon.com/databrew/latest/dg/datatypes.html

In [None]:
try: 
    response = glueClient.create_table(
        DatabaseName=GLUE_DB_NAME,
        TableInput={
            'Name': 'aisles',
            'StorageDescriptor': {
              'Columns': [{
                'Name': 'aisle_id',
                'Type': 'integer'
              }, {
                'Name': 'aisle',
                'Type': 'string'
              }],
              'Location': 's3://crawler-public-us-west-2/flight/2016/csv',
              'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
              'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
              'Compressed': False,
              'NumberOfBuckets': -1,
              'SerdeInfo': {
                'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
                'Parameters': {
                  'field.delim': ',',
                  'serialization.format': ','
                }
              },
            },
            'PartitionKeys': [{
              'Name': 'mon',
              'Type': 'string'
            }],
            'TableType': 'EXTERNAL_TABLE',
            'Parameters': {
              'EXTERNAL': 'TRUE',
              'classification': 'csv',
              'columnsOrdered': 'true',
              'compressionType': 'none',
              'delimiter': ',',
              'skip.header.line.count': '1',
              'typeOfData': 'file'
            }
            }
        ) 
except Exception as e:
    logging.error(e)
    print(e)
print (True)

# Create an identity-base IAM role policy granting Redshift ReadOnly premissions on our S3 reource.

A crucial step in our data pipeline requires the movement of data from S3, our source storage database, to Redshift. To accomodate this step, we'll create an IAM Role that allows Redshift "ReadOnly" access to our S3 bucket so that data can be retrieved. This is nessecary because S3 is a blob storage service without functionality allowing it to write files to a different system or service. 

To create the policy, we'll need to:
- Create a function `get_user_arns` that allows us to retieve all user ARNs from our account. This will be used to ensure we are creating a new user meeting AWS's unique identity naming constraints. It will also be used to get the ARN of new role that we will create.
- Create a new IAM role using aim_client.create_role()
- Attach a policy using aim_client.attach_role_policy()
- Get the ARN for a role using aim_client.get_role()

### Create a function that retrieves user ARNs

Before we create a new role, we want to ensure that our new role has a unique name. It's a good practice to review previously created role, and remove any that are no longer being used to accodate existing quotas.

In [210]:
iam = boto3.client('iam', region_name=DEFAULT_REGION, 
                  aws_access_key_id=KEY, 
                  aws_secret_access_key=SECRET)

def get_user_arns(iam_client):
    paginator = iam_client.get_paginator('list_users')
    for response in paginator.paginate():
        for user in response.get("Users"):
            print(user['Arn'])
            
get_user_arns(iam)

NameError: name 'boto3' is not defined

### Create a new IAM Role
The Request Syntax for creating a new role via API is as shown below. API documentation explaining each parameter can be found here: https://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateRole.html. It should be noted that the `AssumeRolePolicyDocument` needs to be passed in as a json string, which can be done using `json.dumps()`. Here, the purpose of the new role is to assume the role of our root user, who can access other AWS services. 

```

response = client.create_role(
    Path='string',
    RoleName='string',
    AssumeRolePolicyDocument=json.dumps({
        "Version":"2012-10-17",
        "Statement":[{
            "Effect":["Allow"| "Deny"],
            "Principal":{
                "Service":["<service_name>.amazonaws.com"]},
            "Action":[<Action>]}]}),
    Description='string',
    MaxSessionDuration=123,
    PermissionsBoundary='string',
    Tags=[
        {
            'Key': 'string',
            'Value': 'string'
        },
    ]
)
```

In [94]:
try:
    iam.create_role(
    Path="/",
    RoleName=IAM_ROLE_NAME,
    AssumeRolePolicyDocument=json.dumps({
        "Version":"2012-10-17",
        "Statement":[{
            "Effect":"Allow",
            "Principal":{
                "Service":["redshift.amazonaws.com"]},
            "Action":["sts:AssumeRole"]}]}),
    Description='Allow redshift cluster to assume role to access other services')
except Exception as e:
    logging.error(e)
    print(e)
print (True)

ERROR:root:An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhAdmin already exists.


An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhAdmin already exists.
True


### Attach a S3READONLY Policy to IAM ROLE

Instead of creating a read only access policy from scratch, we can use the AWS managed policy `AmazonS3ReadOnlyAccess` to ensure our redshift cluster can only read files from S3.

In [90]:
try:
    response = iam.attach_role_policy(
        RoleName=IAM_ROLE_NAME,
        PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')
except Exception as e:
    logging.error(e)
    print(e)
print (True)

True


### Get the ARN for our New IAM Role

In [91]:
response = iam.get_role(
    RoleName=IAM_ROLE_NAME
)
ROLE_ARN = response['Role']['Arn']

# Create a Redshift Cluster

Now that we have created an S3 bucket, with permissions, added data file to it, and created a IAM role with an ability to take actions on S3 we will create a redshift cluster. Recall that we have already saved the following required variables to our configuration file. 


```
CLUSTER_TYPE = config.get("DWH", "CLUSTER_TYPE")
NODE_CT = config.get("DWH", "NODE_CT")
NODE_TYPE = config.get("DWH", "NODE_TYPE")
IAM_ROLE_NAME = config.get("DWH", "IAM_ROLE_NAME")
CLUSTER_ID = config.get("DWH", "CLUSTER_ID")
DB = config.get("DWH", "DB")
DB_USER = config.get("DWH", "DB_USER")
DB_PASSWORD = config.get("DWH", "DB_PASSWORD")
DB_PORT = config.get("DWH", "DB_PORT")
```

The syntax for creating a new redshoft cluster can be found here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster