# Data Preperation using AWS Glue Dev Endpoints and Sagemaker Notebook <a name="top"></a>



## Exercise: 
[(Back to the top)](#top)

In this notebook, we will do the following activities:
    
- Build a Star (Denormalized) Schema from an OLTP 3NF (3rd Normal Form) Schema.
- Write the derived table for denorm data set in parquet format partitioned out by key fields.
- Finally, orchestrate the pipeline to create an AWS Glue Workflow.

Let's start by connecting to our our AWS Glue Dev Endpoint - a persistent AWS Glue Spark  Development environment.

In [1]:
spark.version

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1592850335670_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

u'2.4.3'

In [2]:
spark.sql("show databases").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|databaseName|
+------------+
|     default|
|    default2|
|   nyc_trips|
|     salesdb|
+------------+

In [3]:
spark.sql("use salesdb")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [4]:
spark.sql("show tables").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| salesdb|          customer|      false|
| salesdb|     customer_site|      false|
| salesdb|           product|      false|
| salesdb|  product_category|      false|
| salesdb|       sales_order|      false|
| salesdb|   sales_order_all|      false|
| salesdb|sales_order_detail|      false|
| salesdb|          supplier|      false|
+--------+------------------+-----------+

Note that regular Spark SQL commands work great as we have enabled the feature 'Use Glue Data Catalog as the Hive metastore' for our AWS Glue Dev Endpoint by default. You can choose to run any spark-sql commands against these tables as an optional exercise 

You can click on the link to read more on [AWS Glue Data Catalog Support for Spark SQL Jobs](
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-data-catalog-hive.html)

## Data Preperation by denormalizing the tables and writing them in parquet format
[(Back to the top)](#top)

In this activity, we will denormalize an OLTP 3NF schema to Parquet. This activity demonstrates the using AWS Glue operations to perform powerful data transformations on input data:

![alt text](../resources/denormalize.png "Building a Star Schema")

In [5]:
spark.sql("use salesdb").show()
spark.sql("show tables").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

++
||
++
++

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| salesdb|          customer|      false|
| salesdb|     customer_site|      false|
| salesdb|           product|      false|
| salesdb|  product_category|      false|
| salesdb|       sales_order|      false|
| salesdb|   sales_order_all|      false|
| salesdb|sales_order_detail|      false|
| salesdb|          supplier|      false|
+--------+------------------+-----------+

### Transform the dataset

Let's now denormalize the source tables where applicable and write out the data in Parquet format to the destination location. Not to change the S3 output_path in the cell below to appropriate bucket in your account

In [6]:
## We will simulate the Glue job arguments 
import sys
sys.argv = ["LabDataPrepJob","--JOB_NAME", "LabDataPrepJob"]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's start the code for the AWS Glue Job:

In [7]:
## Glue boilerplate code

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3, json

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
print (args['JOB_NAME']+" START...")
if 'sc' not in vars(): sc = SparkContext()
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## Glue boilerplate code

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

LabDataPrepJob START...

In [9]:
db_name='salesdb'
table1='customer'
table2='customer_site'
output_dir='s3://glue-labs-001-518010202968/data/sales_analytics/customer_dim'
print (output_dir)

# Read the Source Tables
cust_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)
cust_site_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table2)

# Join the two Source Tables
customer_dim_dyf = Join.apply(cust_dyf,cust_site_dyf,
                       'cust_id', 'cust_id').drop_fields(['cust_id'])

# Write the denormalized CUSTOMER_DIM table in Parquet
glueContext.write_dynamic_frame.from_options(frame = customer_dim_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://glue-labs-001-518010202968/data/sales_analytics/customer_dim
<awsglue.dynamicframe.DynamicFrame object at 0x7fbf9b111410>

In [10]:
table1='product_category'
table2='product'
output_dir='s3://glue-labs-001-518010202968/data/sales_analytics/product_dim/'
print (output_dir)

# Read the Source Tables
table1_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)
table2_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table2)

#Join the Source Tables
product_dim_dyf = Join.apply(table1_dyf,table2_dyf,
                       'category_id', 'category_id').drop_fields(['category_id'])

# Write the denormalized CUSTOMER_DIM table in Parquet
glueContext.write_dynamic_frame.from_options(frame = product_dim_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://glue-labs-001-518010202968/data/sales_analytics/product_dim/
<awsglue.dynamicframe.DynamicFrame object at 0x7fbf9b0b2150>

In [11]:
table1='supplier'
output_dir='s3://glue-labs-001-518010202968/data/sales_analytics/supplier_dim/'
print (output_dir)

# Read the Source Tables
table1_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)


# Write the denormalized CUSTOMER_DIM table in Parquet
glueContext.write_dynamic_frame.from_options(frame = table1_dyf, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://glue-labs-001-518010202968/data/sales_analytics/supplier_dim/
<awsglue.dynamicframe.DynamicFrame object at 0x7fbf9b111790>

In [12]:
table1='sales_order_detail'
table2='sales_order'
output_dir='s3://glue-labs-001-518010202968/data/sales_analytics/sales_order_fact/'
print (output_dir)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://glue-labs-001-518010202968/data/sales_analytics/sales_order_fact/

For the 'sales_order_fact' table, we will try a different approach - 

- We will convert the AWS Glue DynamicFrame to a Spark DataFrame
- Register the Spark Dataframe to a Spark Temporary View
- Use Spark SQL to build the write out the target dataset.

This demonstrates that AWS Glue DynamicFrames and Spark Dataframes are interchangeable and you can get the best of both worlds by using both the options where suitable.

In [13]:
# Read the Source Tables
table1_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table1)
table2_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = table2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
table1_dyf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- LINE_ID: int
|-- LINE_NUMBER: int
|-- ORDER_ID: int
|-- PRODUCT_ID: int
|-- QUANTITY: int
|-- UNIT_PRICE: decimal
|-- DISCOUNT: decimal
|-- SUPPLY_COST: decimal
|-- TAX: decimal

In [15]:
table2_dyf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- ORDER_ID: int
|-- SITE_ID: double
|-- ORDER_DATE: timestamp
|-- SHIP_MODE: string

In [16]:
table1_dyf.toDF().createOrReplaceTempView("sales_order_v")
table2_dyf.toDF().createOrReplaceTempView("sales_order_detail_v")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Notice the output column isTemporary to finf yout temp tables that are activie for this session
spark.sql("show tables").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| salesdb|            customer|      false|
| salesdb|       customer_site|      false|
| salesdb|             product|      false|
| salesdb|    product_category|      false|
| salesdb|         sales_order|      false|
| salesdb|     sales_order_all|      false|
| salesdb|  sales_order_detail|      false|
| salesdb|            supplier|      false|
|        |sales_order_detail_v|       true|
|        |       sales_order_v|       true|
+--------+--------------------+-----------+

In [24]:
# Write the denormalized SALES_ORDER_FACT table
df=spark.sql("SELECT a.*, b.site_id, b.order_date,b.ship_mode \
FROM sales_order_detail_v b, sales_order_v a \
WHERE a.order_id=b.order_id")
df.printSchema()
print(df.count())
df.coalesce(1).write.mode("OVERWRITE").parquet("s3://glue-labs001-518010202968/data/sales_analytics/sales_order_fact/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- LINE_ID: integer (nullable = true)
 |-- LINE_NUMBER: integer (nullable = true)
 |-- ORDER_ID: integer (nullable = true)
 |-- PRODUCT_ID: integer (nullable = true)
 |-- QUANTITY: integer (nullable = true)
 |-- UNIT_PRICE: decimal(38,10) (nullable = true)
 |-- DISCOUNT: decimal(38,10) (nullable = true)
 |-- SUPPLY_COST: decimal(38,10) (nullable = true)
 |-- TAX: decimal(38,10) (nullable = true)
 |-- site_id: double (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- ship_mode: string (nullable = true)

98000

Note that we used the power of Spark SQL for this transformation instead of AWS Glue DynamicFrame transforms. This dataset is small so we also coalesced the number of partitions in the Spark dataframe to 1 to ensure only 1 file gets written to our output location.

In [37]:
# You can also run linux commands on your dev endpoint machine.
%%sh
ls -l /tmp
pip install -U scikit-learn


total 116
drwxr-xr-x 3 root       root       4096 Jun 18 01:23 brazil-npm-registry
-rw-r--r-- 1 ec2-user   ec2-user    543 Jun 18 01:33 core-js-banners
drwxr-xr-x 2 role-agent role-agent 4096 Jun 23 19:16 hsperfdata_role-agent
drwxr-xr-x 3 role-agent role-agent 4096 Jun 23 19:16 jetty-localhost-9081-role-proxy-agent.war-_-any-3270408570273192286.dir
drwxr-xr-x 3 root       root       4096 Jun 18 01:23 lib
-rw-r--r-- 1 root       root          0 Jun 23 19:18 lifecycle-config-reconnect-dev-endpoint-daemon.lock
-rw-r--r-- 1 root       root          0 Jun 23 19:18 lifecycle-config-v2-dev-endpoint-daemon.lock
drwxr-xr-x 3 ec2-user   ec2-user   4096 Jun 18 01:33 npm-12080-1555bd66
drwxr-xr-x 3 ec2-user   ec2-user   4096 Jun 18 01:35 npm-12474-c4af1476
drwxr-xr-x 3 ec2-user   ec2-user   4096 Jun 18 01:35 npm-12503-7b4d78a8
drwxr-xr-x 3 ec2-user   ec2-user   4096 Jun 18 01:36 npm-12716-137db1a0
drwxr-xr-x 3 ec2-user   ec2-user   4096 Jun 18 01:36 npm-12756-5e28ef95
drwxr-xr-x 3 ec2-user   ec2-

Now that the output data is in Amazon S3, let's crawl this dataset in AWS Glue and query this data using Amazon Athena.

### Step 3 : Crawl the Transformed Data

- Navigate to the Glue console at Services -> Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **sales_analytics_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/data/sales_analytics/**
    - Page: Choose an IAM role
        - IAM Role: Choose an existing IAM role **glue-labs-GlueServiceRole**
    - Page: Configure the crawler's output
        - Database:  Click on ‘Add database’ and enter database name as **sales_analytics**.
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.


In [39]:
spark.sql("use sales_analytics").show()
spark.sql("show tables").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

++
||
++
++

+---------------+--------------------+-----------+
|       database|           tableName|isTemporary|
+---------------+--------------------+-----------+
|sales_analytics|        customer_dim|      false|
|sales_analytics|         product_dim|      false|
|               |sales_order_detail_v|       true|
|               |       sales_order_v|       true|
+---------------+--------------------+-----------+

## Building an AWS Glue Workflow
[(Back to the top)](#top)

SO far in this exercise, you have created and tested your code. Once you developed the code and jobs you need, its time to orchestrate multiple jobs to deploy it and schedule to run on a periodic bases. This section helps you build the workflow. An AWS Glue workflow is an orchestration used to visualize and manage the relationship and execution of multiple AWS Glue triggers, jobs and crawlers. Let's now build an AWS Glue Workflow for the same. 

The 1st step is to create the AWS Glue Jobs. As the AWS Glue ETL code is already staged in our Amazon S3 bucket, we will simply call the AWS Glue APIs to create the AWS Glue Jobs.

In [47]:
%local

import boto3

acct_number=boto3.client('sts').get_caller_identity().get('Account')
bucket='glue-labs-001-518010202968'

# Create the AWS Glue Spark Jobs
glue = boto3.client("glue")

for job_name in ['Load_SALES_ORDER_FACT', 'Load_PRODUCT_DIM', 'Load_CUSTOMER_DIM','Load_SUPPLIER_DIM']:
    response=glue.create_job(Name=job_name,
                         Role=f"arn:aws:iam::{acct_number}:role/glue-labs-GlueServiceRole",
                         ExecutionProperty={'MaxConcurrentRuns': 1},
                         Command={'Name': 'glueetl',
                                  'ScriptLocation': f's3://{bucket}/scripts/{job_name}.py',
                                  'PythonVersion': '3'},
                         DefaultArguments={'--TempDir': f's3://{bucket}/temp',
                                           '--enable-continuous-cloudwatch-log': 'true',
                                           '--enable-glue-datacatalog': '',
                                           '--enable-metrics': '',
                                           '--enable-spark-ui': 'true',
                                           '--spark-event-logs-path': f's3://{bucket}/spark_glue_etl_logs/{job_name}',
                                           '--job-bookmark-option': 'job-bookmark-disable',
                                           '--job-language': 'python',
                                           '--S3_BUCKET': bucket },
                         MaxRetries=0,
                         Timeout=2880,
                         MaxCapacity=3.0,
                         GlueVersion='1.0',
                         Tags={'Owner': 'Glue_Labs'}
                        )
    print (response)

{'Name': 'Load_SALES_ORDER_FACT', 'ResponseMetadata': {'RequestId': 'cab5485d-21df-44ba-ba1b-103d95c5f6f8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 24 Jun 2020 14:59:31 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '32', 'connection': 'keep-alive', 'x-amzn-requestid': 'cab5485d-21df-44ba-ba1b-103d95c5f6f8'}, 'RetryAttempts': 0}}
{'Name': 'Load_PRODUCT_DIM', 'ResponseMetadata': {'RequestId': '3342d9ef-c907-4612-8f23-9a3ab757e337', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 24 Jun 2020 14:59:31 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '27', 'connection': 'keep-alive', 'x-amzn-requestid': '3342d9ef-c907-4612-8f23-9a3ab757e337'}, 'RetryAttempts': 0}}
{'Name': 'Load_CUSTOMER_DIM', 'ResponseMetadata': {'RequestId': '1c218cac-fd79-4d0a-addd-125b1addf1c4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 24 Jun 2020 14:59:31 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '28', 'connection': '

The Workflow consists of 3 AWS Glue triggers:

- The 1st OnDemand Trigger loads the Dimension tables.
- The 2nd Conditional Trigger loads the Fact table.
- The 3rd Conditional Trigger updated the table definitions in the Catalog.

In [50]:
%local

glue = boto3.client("glue")

# Create the AWS Glue Workflow
response = glue.create_workflow(
    Name='Sales_Analytics_Workflow2',
    Description='Sales Analytics Workflow v1.0'
)
print (response)

# 1. The Trigger to load the Dimensions table
response = glue.create_trigger(
    Name='1_Load_Dimensions',
    WorkflowName='Sales_Analytics_Workflow',
    Type='ON_DEMAND',
    Actions=[{'JobName': 'Load_CUSTOMER_DIM',
    'Arguments': {'--job-bookmark-option': 'job-bookmark-disable'},
    'Timeout': 2880},
   {'JobName': 'Load_PRODUCT_DIM',
    'Arguments': {'--job-bookmark-option': 'job-bookmark-disable'},
    'Timeout': 2880},
   {'JobName': 'Load_SUPPLIER_DIM',
    'Arguments': {'--job-bookmark-option': 'job-bookmark-disable'},
    'Timeout': 2880}]
)
print (response)  

# 2. The Trigger to load the Facts table
response = glue.create_trigger(
    Name='2_Load_Facts',
    WorkflowName='Sales_Analytics_Workflow',
    Type='CONDITIONAL',
    StartOnCreation=True,
    Actions=[{'JobName': 'Load_SALES_ORDER_FACT'}],
    Predicate= {'Logical': 'AND',
    'Conditions': [{'LogicalOperator': 'EQUALS',
                  'JobName': 'Load_SUPPLIER_DIM',
                   'State': 'SUCCEEDED'},
                  {'LogicalOperator': 'EQUALS',
                   'JobName': 'Load_PRODUCT_DIM',
                   'State': 'SUCCEEDED'},
                  {'LogicalOperator': 'EQUALS',
                   'JobName': 'Load_CUSTOMER_DIM',
                   'State': 'SUCCEEDED'}]
               }
)
print (response)  

# Finally, the Trigger for the Crawler
response = glue.create_trigger(
    Name='3_Update_Catalog',
    WorkflowName='Sales_Analytics_Workflow',
    Type='CONDITIONAL',
    StartOnCreation=True,
    Actions=[{'CrawlerName': 'sales_analytics_crawler'}],
    Predicate= {'Logical': 'ANY',
   'Conditions': [{'LogicalOperator': 'EQUALS',
     'JobName': 'Load_SALES_ORDER_FACT',
     'State': 'SUCCEEDED'}]}
)
print (response)     

{'Name': 'Sales_Analytics_Workflow2', 'ResponseMetadata': {'RequestId': 'd134f5aa-b171-4825-9087-442daed7f209', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 24 Jun 2020 15:16:58 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '36', 'connection': 'keep-alive', 'x-amzn-requestid': 'd134f5aa-b171-4825-9087-442daed7f209'}, 'RetryAttempts': 0}}
{'Name': '1_Load_Dimensions', 'ResponseMetadata': {'RequestId': '4769c63b-a5f4-4429-bbbb-e02564faf1ca', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 24 Jun 2020 15:16:58 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '28', 'connection': 'keep-alive', 'x-amzn-requestid': '4769c63b-a5f4-4429-bbbb-e02564faf1ca'}, 'RetryAttempts': 0}}
{'Name': '2_Load_Facts', 'ResponseMetadata': {'RequestId': '8cd96fb7-2725-4ec6-8445-0f802ec6f110', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 24 Jun 2020 15:16:58 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '23', 'connection': '

Let's review the AWS Glue Workflow created:
    
- Navigate to the Glue Console at Service -> Glue
- From the left-hand panel menu, choose Workflows
- Select the Workflow 'Sales_Analytics_Workflow'.

Your workflow should look like this:

![title](../resources/Glue_Workflow.png)
  

Let us now run this workflow: 

- Select the workflow and click on 'Action - > Run' to launch the workflow
- You can view the run details and visually track the progress of each acitvity in the workflow from the 'History' tab by selecting the workflow run and clicking on 'View Run Details'

![title](../resources/View_Run_Details.png)


## Wrap-up
[(Back to the top)](#top)


In this notebook, we ran exercises to perform: 

1. A CSV to Parquet conversion and observed how easy it is to transform and write data to an Amazon S3 bucket using AWS Glue, partitioned by key fields.
2. A more complex transformation - denormalizing of a 3NF OLTP schema, and we observed how easy it is to perform complex data transformations using the power of both AWS Glue DynamicFrames and Spark SQL.
3. We built and executed an AWS Glue Workflow to orchestrate multiple AWS Glue Jobs.
