# Buuilding a Data Lake using AWS Glue

1. [Introduction](#Introduction)
2. [Activity 1 : CSV to Parquet conversion](#Activity-1-:-CSV-to-Parquet-conversion)
3. [Activity 2 : Building a Star Schema in your Datalake](#Activity-2-:-Building-a-Star-Schema-in-your-Datalake)
3. [Activity 3 : Building an AWS Glue Workflow](#Activity-3-:-Building-an-AWS-Glue-Workflow)

## Introduction

In this notebook, we will use AWS Glue to perform 2 activities:
    
- Convert a CSV Dataset to Parquet partitioned out by key fields.
- Build a Star (Denormalized) Schema from an OLTP 3NF Schema.

Let's start by connecting to our our Glue DevEndpoint - a persistent Glue Development environment.

In [None]:
spark.version

In [None]:
spark.sql("show databases").show()

In [None]:
spark.sql("show tables").show()

## Activity 1 : CSV to Parquet conversion


The 1st dataset we will be using is the NYC Taxi Trips CSV dataset with 1.2B records. We will partition the data in the analytics tier by vendor name, year and month, catalog this data in the AWS Glue Data Catalog. This dataset has 5 vendors and 8 years of data.

### Crawl the Source Data

The 1st step is to run the AWS Crawler on the raw dataset to create the table in the AWS Glue Catalog.

Create and Execute a Glue Crawler on the source data in S3

- Navigate to the Glue console at Services -> Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **nyc_trips_csv_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/data/nyc_trips_csv/**
    - Page: Choose an IAM role
       - IAM Role: Choose the **GlueServiceRole**
    - Page: Configure the crawler's output
        - Database: Click on ‘Add database’ and enter database name as **nyc_trips**.
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on 'Run crawler' to run the Crawler.

Once the data is crawled, we can view the database and tables in the Glue Catalog and query the table as well:

### Transform the data to Parquet

In [None]:
spark.sql("use nyc_trips").show()

In [None]:
spark.sql("show tables").show()

In [None]:
df = spark.sql("select * from nyc_trips.nyc_trips_csv")
df.printSchema()

In [None]:
## We will simulate the Glue job arguments 
import sys
sys.argv = ["CSV2Parquet","--JOB_NAME", "CSV2Parquet"]

Let's start the code for the Glue Job:

In [None]:
## Glue boilerplate code

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3, json

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
print (args['JOB_NAME']+" START...")
if 'sc' not in vars(): sc = SparkContext()
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## Glue boilerplate code

In [None]:
db_name='nyc_trips'
tbl_name='nyc_trips_csv'
output_dir='s3://###s3_bucket###/data/nyc_trips_parquet/'

In [None]:
dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name)
dyf.printSchema()

In [None]:
# Write the data out in Parquet
glueContext.write_dynamic_frame.from_options(frame = dyf, connection_type = "s3", connection_options = {"path": output_dir, "partitionKeys": ['vendor_name', 'year', 'month']}, format = "parquet")

### Crawl the Transformed Data

Now that the output data is in Amazon S3, let's crawl this dataset in AWS Glue and query this data using Amazon Athena.

- Navigate to the Glue console at Services -> Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **nyc_trips_parquet_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/data/nyc_trips_parquet/**
    - Page: Choose an IAM role
       - IAM Role: Choose the **GlueServiceRole**
    - Page: Configure the crawler's output
        - Database: Select database as **nyc_trips**
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.

Navigate to the Athena console, Services -> Athena to run Athena queries on this dataset.

Note: You may set output location for Athena by clicking on Settings -> Query result location
and setting the value to : 

**s3://###s3_bucket###/athena-query-results/**


## Activity 2 : Building a Star Schema in your Datalake

In this activity, we will denormalize an OLTP 3NF schema to Parquet. This activity demonstrates the using Glue operations to perform powerful data transformations on input data:

![alt text](resources/denormalize.png "Building a Star Schema")

### Crawl the Source Data

The 1st step is to run the AWS Crawler on the raw dataset to create the tables in the AWS Glue Catalog.

- Navigate to the Glue console at Services -> Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **salesdb_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/data/salesdb/**
    - Page: Choose an IAM role
       - IAM Role: Choose the **GlueServiceRole**
    - Page: Configure the crawler's output
        - Database:  Click on ‘Add database’ and enter database name as **salesdb**.
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.

### Transform the dataset

In [None]:
db_name='salesdb'
table1='customer'
table2='customer_site'
output_dir='s3://###s3_bucket###/data/sales_analytics/customer_dim'
print (output_dir)

# Read the Source Tables
cust_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)
cust_site_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table2)

#Join the Source Tables
customer_dim_dyf = Join.apply(cust_dyf,cust_site_dyf,
                       'cust_id', 'cust_id').drop_fields(['cust_id'])

# Write the denormalized CUSTOMER_DIM table in Parquet
glueContext.write_dynamic_frame.from_options(frame = customer_dim_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet")


In [None]:
table1='product_category'
table2='product'
output_dir='s3://###s3_bucket###/data/sales_analytics/product_dim/'
print (output_dir)

# Read the Source Tables
table1_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)
table2_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table2)

#Join the Source Tables
product_dim_dyf = Join.apply(table1_dyf,table2_dyf,
                       'category_id', 'category_id').drop_fields(['category_id'])

# Write the denormalized CUSTOMER_DIM table in Parquet
glueContext.write_dynamic_frame.from_options(frame = product_dim_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet")


In [None]:
table1='supplier'
output_dir='s3://###s3_bucket###/data/sales_analytics/supplier_dim/'
print (output_dir)

# Read the Source Tables
table1_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)


# Write the denormalized CUSTOMER_DIM table in Parquet
glueContext.write_dynamic_frame.from_options(frame = table1_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet")

In [None]:
table1='sales_order_detail'
table2='sales_order'
output_dir='s3://###s3_bucket###/data/sales_analytics/sales_order_fact/'
print (output_dir)

For the 'sales_order_fact' table, we will try a different approach - 

- We will convert the Glue DynamicFrame to a Spark DataFrame
- Register the Spark Dataframe to a Spark Temporary View
- Use Spark SQL to build the write out the target dataset.

This shows that Glue DynamicFrames and Spark Dataframes are interchangeable and you can get the best of both worlds.

In [None]:
# Read the Source Tables
table1_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)
table2_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table2)

In [None]:
table1_dyf.printSchema()

In [None]:
table2_dyf.printSchema()

In [None]:
table1_dyf.toDF().createOrReplaceTempView("sales_order_v")
table2_dyf.toDF().createOrReplaceTempView("sales_order_detail_v")

In [None]:
# Write the denormalized SALES_ORDER_FACT table
df=spark.sql("SELECT a.*, b.site_id, b.order_date,b.ship_mode \
FROM sales_order_detail_v b, sales_order_v a \
WHERE a.order_id=b.order_id")
df.printSchema()
print(df.count())
df.coalesce(1).write.mode("OVERWRITE").parquet("s3://###s3_bucket###/data/sales_analytics/sales_order_fact/")

Now that the outut data is in Amazon S3, let's crawl this dataset in AWS Glue and query this data using Amazon Athena.

### Crawl the Transformed Data

- Navigate to the Glue console at Services -> Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **sales_analytics_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/data/sales_analytics/**
    - Page: Choose an IAM role
       - IAM Role: Choose the **GlueServiceRole**
    - Page: Configure the crawler's output
        - Database:  Click on ‘Add database’ and enter database name as **sales_analytics**.
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.


## Activity 3 : Building an AWS Glue Workflow

Let's now build an AWS Glue Workflow for the same. The 1st step is to create the Glue Jobs. As the code is already available, we will simply call the AWS Glue APIs to create the Glue Jobs.

In [None]:
%local

import boto3

acct_number=boto3.client('sts').get_caller_identity().get('Account')
bucket='###s3_bucket###'

# Create the Glue Spark Jobs
glue = boto3.client("glue")

for job_name in ['Load_SALES_ORDER_FACT', 'Load_PRODUCT_DIM', 'Load_CUSTOMER_DIM','Load_SUPPLIER_DIM']:
    response=glue.create_job(Name=job_name,
                         Role="arn:aws:iam::%s:role/GlueServiceRole"%acct_number,
                         ExecutionProperty={'MaxConcurrentRuns': 1},
                         Command={'Name': 'glueetl',
                                  'ScriptLocation': 's3://%s/scripts/glue/%s.py'%(bucket,job_name),
                                  'PythonVersion': '3'},
                         DefaultArguments={'--TempDir': 's3://%s/temp'%bucket,
                                           '--enable-continuous-cloudwatch-log': 'true',
                                           '--enable-glue-datacatalog': '',
                                           '--enable-metrics': '',
                                           '--enable-spark-ui': 'true',
                                           '--spark-event-logs-path': 's3://%s/spark_glue_etl_logs/%s'%(bucket,job_name),
                                           '--job-bookmark-option': 'job-bookmark-disable',
                                           '--job-language': 'python',
                                           '--S3_BUCKET': bucket },
                         MaxRetries=0,
                         Timeout=2880,
                         MaxCapacity=3.0,
                         GlueVersion='1.0',
                         Tags={'Owner': 'TKO_Labs'}
                        )
    print (response)

Now that the job is created, lets us build an AWS Glue Workflow for the same:
    
- Navigate to the Glue Console at Service -> Glue
- From the left-hand panel menu, choose Workflows and click on 'Add Workflow'
- Enter WorkFlow Name as 'Data_Transformation_Workflow' and click on 'Add Workflow'.

To create the Workflow:

- Trigger 1_Load_Dimensions:
  - Click on 'Add Trigger'
  - Select Tab 'Add New'
  - Enter Trigger Name as '1_Load_Dimensions'
  - Choose Trigger type as 'On demand'
  - Click on 'Add' to create the Trigger
  - Select the trigger and click on 'Add Action' -> 'Add jobs/crawlers to trigger'
  - Select the jobs - Load_CUSTOMER_DIM, Load_PRODUCT_DIM and Load_SUPPLIER_DIM
  - Click on 'Add' to save changes
- Trigger 2_Load_Facts:
  - Click on any of the jobs e.g. 'Load_CUSTOMER_DIM'
  - Click on 'Add Trigger'
  - Select Tab 'Add New'
  - Enter Trigger Name as '2_Load_Facts'
  - Choose Trigger logic as 'Start after ALL watched event'
  - Click on 'Add' to create the Trigger
  - Select the trigger and click on 'Add Action' -> 'Add jobs/crawlers to watch'
  - Select the other jobs that are part of the load dimensions step -  Load_PRODUCT_DIM and Load_SUPPLIER_DIM
  - Click on 'Add' to save changes
  - Select the trigger and click on 'Add Action' -> 'Add jobs/crawlers to trigger'
  - Select the job - Load_SALES_ORDER_FACT
  - Click on 'Add' to save changes
- Trigger 3_Update_Metadata:
  - Click on the job e.g. 'Load_SALES_ORDER_FACT'
  - Click on 'Add Trigger'
  - Select Tab 'Add New'
  - Enter Trigger Name as '3_Update_Metadata'
  - Click on 'Add' to create the Trigger
  - Select the trigger and click on 'Add Action' -> 'Add jobs/crawlers to trigger'
  - From the 'Crawlers' tab, select the crawler 'sales_analytics_crawler'.
  - Click on 'Add' to save changes
  
Your workflow should look like this:

![title](resources/Glue_Workflow.png)
  


Our Glue Workflow is complete. Let us now run this workflow: 

- Select the workflow and click on Action - > Run
- You can view the run details from the 'History' tab by selecting the workflow run and clicking on 'View Run Details'

## Wrap-up

In this notebook, we ran exercises to perform: 

1. A CSV to Parquet conversion and observed how easy it is to transform and write data to S3 using AWS Glue, partitioned by key fields.
2. A more complex transformation - denormalizing of a 3NF OLTP schema, and we observed how easy it is to perform complex data transformations using the power of both Glue DynamicFrames and Spark SQL.
3. We built and executed an AWS Glue Workflow to orchestrate the Glue Jobs.
