# Data Preperation using AWS Glue Dev Endpoints and Sagemaker Notebook <a name="top"></a>



## Exercise: 
[(Back to the top)](#top)

In this notebook, we will do the following activities:
    
- Build a Star (Denormalized) Schema from an OLTP 3NF (3rd Normal Form) Schema.
- Write the derived table for denorm data set in parquet format partitioned out by key fields.
- Finally, orchestrate the pipeline to create an AWS Glue Workflow.

Let's start by connecting to our our AWS Glue Dev Endpoint - a persistent AWS Glue Spark  Development environment.

In [1]:
spark.version

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1597082089998_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2.4.5-amzn-0'

In [2]:
spark.sql("show databases").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|databaseName|
+------------+
|     default|
|     salesdb|
+------------+

In [3]:
spark.sql("use salesdb")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [4]:
spark.sql("show tables").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| salesdb|          customer|      false|
| salesdb|     customer_site|      false|
| salesdb|     nyc_trips_csv|      false|
| salesdb|           product|      false|
| salesdb|  product_category|      false|
| salesdb|       sales_order|      false|
| salesdb|   sales_order_all|      false|
| salesdb|sales_order_detail|      false|
| salesdb|          supplier|      false|
+--------+------------------+-----------+

Note that regular Spark SQL commands work great as we have enabled the feature 'Use Glue Data Catalog as the Hive metastore' for our AWS Glue Dev Endpoint by default. You can choose to run any spark-sql commands against these tables as an optional exercise 

You can click on the link to read more on [AWS Glue Data Catalog Support for Spark SQL Jobs](
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-data-catalog-hive.html)

#### Above tables are pre-created for you using the AWS Glue crawler and you can see any new EMR cluster can seamlessly access the tables. Now, what about files that are in S3 (say CSV) which you need to use spark and create a table using a data frame and query it in sql? Use the section below

## Load CSV files from S3 into spark program and create tables to query in sql

In [5]:
# Boilerplate code
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import Column
from pyspark.sql import Row # A row of data in a DataFrame
from pyspark.sql import GroupedData # Aggregation methods, returned by DataFrame.groupBy().
from pyspark.sql import DataFrameNaFunctions # Methods for handling missing data (null values).
from pyspark.sql import DataFrameStatFunctions # Methods for statistics functionality.
from pyspark.sql import functions # List of built-in functions available for DataFrame.
from pyspark.sql import types # List of data types available.
from pyspark.sql import Window # For working with window functions.
#End Boilerplate code

nytrip_df = spark.read.csv("s3://glue-labs-001-180486424913/data/nyc_trips_csv", header='true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#Print schema for the csv files read in previous step
nytrip_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- vendor_name: string (nullable = true)
 |-- trip_pickup_datetime: string (nullable = true)
 |-- trip_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- are_amt: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amt: string (nullable = true)
 |-- tolls_amt: string (nullable = true)
 |-- total_amt: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)

In [7]:
#To run sql commands on the above spark dataframe, we will create a temp table. This table will persist through the life of this spark session
nytrip_df.createOrReplaceTempView("temp_nytripdata")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
#Validate your glue metastore and see if this table shows as a temp table as per third column.
spark.sql("use salesdb")
spark.sql("show tables").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| salesdb|          customer|      false|
| salesdb|     customer_site|      false|
| salesdb|     nyc_trips_csv|      false|
| salesdb|           product|      false|
| salesdb|  product_category|      false|
| salesdb|       sales_order|      false|
| salesdb|   sales_order_all|      false|
| salesdb|sales_order_detail|      false|
| salesdb|          supplier|      false|
|        |   temp_nytripdata|       true|
+--------+------------------+-----------+

## Transform your data by denormalizing the tables and writing them in parquet format
[(Back to the top)](#top)

In this activity, we will denormalize two tables and create a Parquet format output.


In [9]:
# Now fire at will using your standard ANSI sql queries against the tables in the catalog
spark.sql("select * from temp_nytripdata").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+---------------------+---------------+-------------+------------+-------+---------+-------+-------+---------+---------+----+-----+
|vendor_name|trip_pickup_datetime|trip_dropoff_datetime|passenger_count|trip_distance|payment_type|are_amt|surcharge|mta_tax|tip_amt|tolls_amt|total_amt|year|month|
+-----------+--------------------+---------------------+---------------+-------------+------------+-------+---------+-------+-------+---------+---------+----+-----+
|        VTS| 2010-03-12 21:37:00|  2010-03-12 21:51:00|              1|          4.5|         CSH|   12.9|      0.5|    0.5|    0.0|      0.0|     13.9|2010|   03|
|        CMT| 2010-03-08 19:09:28|  2010-03-08 19:16:30|              1|          1.1|         Cas|    5.7|      1.0|    0.5|    0.0|      0.0|      7.2|2010|   03|
|        VTS| 2010-03-20 22:53:00|  2010-03-20 23:10:00|              1|         2.19|         CSH|   10.1|      0.5|    0.5|    0.0|      0.0|     11.1|2010|   03|
|        C

### Transform the dataset

Let's now denormalize the source tables where applicable and write out the data in Parquet format to the destination location. Note to change the S3 output_path in the cell below to appropriate bucket in your account

In [12]:
adf=spark.sql("select * from product_category limit 5")
adf.printSchema()
bdf=spark.sql("select * from product limit 5")
bdf.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image_url: string (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- supplier_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- quantity_per_unit: integer (nullable = true)
 |-- unit_price: decimal(10,2) (nullable = true)

In [13]:
# Create product denorm table joining between product_category and product tables
product_df=spark.sql("SELECT a.category_id,a.category_name, b.product_id,b.name,b.supplier_id \
FROM product_category a, product b \
WHERE a.category_id=b.category_id")
product_df.printSchema()
print(product_df.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- supplier_id: integer (nullable = true)

1000

In [None]:
df2=product_df.drop('category_id')
df2.printSchema()

In [14]:
#Write the entire product denorm table above into 1 file in parquet file format. Make Sure to update the S3 path below and replace it with your 

product_df.coalesce(1).write.mode("OVERWRITE").parquet("s3://glue-labs-001-180486424913/data/sales_analytics/product_dim/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's start the code for the AWS Glue Job:

### Now that your data is written to S3, switch to AWS console and validate you have 1 parquet file created at the above S3 location.

## Final step, Crawl the Transformed Data and create a table in your catalog for querying

- Navigate to the Glue console at Services -> Glue
- From the left-hand panel menu, navigate to Data Catalog -> Crawlers.
- Click on the button ‘Add Crawler’ to create a new Glue Crawler.
- Fields to fill in:
    - Page: Add information about your crawler
        - Crawler name: **sales_analytics_crawler**
    - Page: Add a data store
        - Choose a data store: S3
        - Include path: **s3://###s3_bucket###/data/sales_analytics/**
    - Page: Choose an IAM role
        - IAM Role: Choose an existing IAM role **glue-labs-GlueServiceRole**
    - Page: Configure the crawler's output
        - Database:  Click on ‘Add database’ and enter database name as **sales_analytics**.
- Click on the button ‘Finish’ to create the crawler.
- Select the new Crawler and click on Run crawler to run the Crawler.


Now that the output data is in Amazon S3, let's crawl this dataset in AWS Glue and query this data using Amazon Athena.

In [None]:
# validate your table exisits and start querying
spark.sql("use sales_analytics").show()
spark.sql("show tables").show()

## Congratulations!!! You have now successfully completed this exercise and learned how to use spark in your day-to-day
[(Back to the top)](#top)
