# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [5]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2


####  Run this cell to set up and start your interactive session.


In [144]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col
from awsglue.job import Job




In [149]:
Sc = SparkContext.getOrCreate()
glueContext = GlueContext(Sc)
spark = glueCon.spark_session
job = Job(glueCon)




### catalog: database and table names, s3 output bucket

In [2]:
db_name = "ocean-dbt"
tbl_name = "ocean_input_data"
s3_write_bucket = "s3://ocean-input-data/"




# Extract

#### Creating datasource using catalog table

In [6]:
dataframe = glueContext.create_dynamic_frame.from_catalog(database= db_name, table_name= tbl_name)




In [7]:
dataframe.printSchema()

root
|-- date: string
|-- bill number: string
|-- item desc: string
|-- time: string
|-- quantity: long
|-- rate: long
|-- tax: double
|-- discount: long
|-- total: double
|-- category: string


#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [96]:
df_ocean = dataframe.toDF()
df_ocean.show()

+---------+-----------+--------------------+----------+--------+----+-----+--------+------+--------+
|     date|bill number|           item desc|      time|quantity|rate|  tax|discount| total|category|
+---------+-----------+--------------------+----------+--------+----+-----+--------+------+--------+
|01-Jan-20|   G0470115|MINERAL WATER(100...|1:15:11 pm|       1|  50|11.88|       0| 61.88|BEVERAGE|
|01-Jan-20|   G0470115|MONSOON MALABAR (...|1:15:11 pm|       1| 100|23.75|       0|123.75|BEVERAGE|
|01-Jan-20|   G0470116| MASALA CHAI CUTTING|1:17:35 pm|       1|  40|  9.5|       0|  49.5|BEVERAGE|
|01-Jan-20|   G0470117|MINERAL WATER(100...|1:19:55 pm|       1|  50|11.88|       0| 61.88|BEVERAGE|
|01-Jan-20|   G0470283|   MOROCCAN MINT TEA|1:20:18 am|       1|  45|10.69|       0| 55.69|BEVERAGE|
|01-Jan-20|   G0470283|MINERAL WATER(100...|1:20:18 am|       1|  50|11.88|       0| 61.88|BEVERAGE|
|01-Jan-20|   G0470118| MASALA CHAI CUTTING|1:21:34 pm|       1|  40|  9.5|       0|  49.5|

# Transform

In [97]:
# dropping rows if a row contains more than 5 null values
df_ocean = df_ocean.dropna(thresh=5)




In [103]:
df_ocean.show(5)

+---------+-----------+--------------------+----------+--------+----+-----+--------+------+--------+
|     date|bill number|           item desc|      time|quantity|rate|  tax|discount| total|category|
+---------+-----------+--------------------+----------+--------+----+-----+--------+------+--------+
|01-Jan-20|   G0470115|MINERAL WATER(100...|1:15:11 pm|       1|  50|11.88|       0| 61.88|BEVERAGE|
|01-Jan-20|   G0470115|MONSOON MALABAR (...|1:15:11 pm|       1| 100|23.75|       0|123.75|BEVERAGE|
|01-Jan-20|   G0470116| MASALA CHAI CUTTING|1:17:35 pm|       1|  40|  9.5|       0|  49.5|BEVERAGE|
|01-Jan-20|   G0470117|MINERAL WATER(100...|1:19:55 pm|       1|  50|11.88|       0| 61.88|BEVERAGE|
|01-Jan-20|   G0470283|   MOROCCAN MINT TEA|1:20:18 am|       1|  45|10.69|       0| 55.69|BEVERAGE|
+---------+-----------+--------------------+----------+--------+----+-----+--------+------+--------+
only showing top 5 rows


In [100]:
#ranaming multiple column names
df_ocean_rename = df_ocean.withColumnRenamed("item desc","Item_Description").withColumnRenamed("total","Total_Bill_Amount")




In [102]:
df_ocean_rename.show(5)

+---------+-----------+--------------------+----------+--------+----+-----+--------+-----------------+--------+
|     date|bill number|    Item_Description|      time|quantity|rate|  tax|discount|Total_Bill_Amount|category|
+---------+-----------+--------------------+----------+--------+----+-----+--------+-----------------+--------+
|01-Jan-20|   G0470115|MINERAL WATER(100...|1:15:11 pm|       1|  50|11.88|       0|            61.88|BEVERAGE|
|01-Jan-20|   G0470115|MONSOON MALABAR (...|1:15:11 pm|       1| 100|23.75|       0|           123.75|BEVERAGE|
|01-Jan-20|   G0470116| MASALA CHAI CUTTING|1:17:35 pm|       1|  40|  9.5|       0|             49.5|BEVERAGE|
|01-Jan-20|   G0470117|MINERAL WATER(100...|1:19:55 pm|       1|  50|11.88|       0|            61.88|BEVERAGE|
|01-Jan-20|   G0470283|   MOROCCAN MINT TEA|1:20:18 am|       1|  45|10.69|       0|            55.69|BEVERAGE|
+---------+-----------+--------------------+----------+--------+----+-----+--------+-----------------+--

In [104]:
# replacing the missing value in Item_Description column and populating with a "Not Available"
df_ocean_nonull = df_ocean_rename.na.fill("Not Available",subset=["Item_Description"])




In [111]:
df_ocean_nonull.show(5)

+---------+-----------+--------------------+----------+--------+----+-----+--------+-----------------+--------+
|     date|bill number|    Item_Description|      time|quantity|rate|  tax|discount|Total_Bill_Amount|category|
+---------+-----------+--------------------+----------+--------+----+-----+--------+-----------------+--------+
|01-Jan-20|   G0470115|MINERAL WATER(100...|1:15:11 pm|       1|  50|11.88|       0|            61.88|BEVERAGE|
|01-Jan-20|   G0470115|MONSOON MALABAR (...|1:15:11 pm|       1| 100|23.75|       0|           123.75|BEVERAGE|
|01-Jan-20|   G0470116| MASALA CHAI CUTTING|1:17:35 pm|       1|  40|  9.5|       0|             49.5|BEVERAGE|
|01-Jan-20|   G0470117|MINERAL WATER(100...|1:19:55 pm|       1|  50|11.88|       0|            61.88|BEVERAGE|
|01-Jan-20|   G0470283|   MOROCCAN MINT TEA|1:20:18 am|       1|  45|10.69|       0|            55.69|BEVERAGE|
+---------+-----------+--------------------+----------+--------+----+-----+--------+-----------------+--

In [106]:
#Grouping the records by bill number and calculating sum of total bill amount then sort them in decending order
df_max_bill_number = df_ocean_nonull.groupBy("bill number").sum("Total_Bill_Amount").sort("sum(Total_Bill_Amount)",ascending=False)




In [109]:
df_max_bill_number.show(5)

+-----------+----------------------+
|bill number|sum(Total_Bill_Amount)|
+-----------+----------------------+
|   G0470157|                365.07|
|   G0470122|                315.57|
|   G0470285|                315.56|
|   G0470260|                315.56|
|   G0470141|                235.13|
+-----------+----------------------+
only showing top 5 rows


In [113]:
#Grouping the records by Item_Description and calculating sum of total bill amount then sort them in decending order
df_max_item_description = df_ocean_nonull.groupBy("Item_Description").sum("Total_Bill_Amount").sort("sum(Total_Bill_Amount)",ascending=False)




In [114]:
df_max_item_description.show(5)

+--------------------+----------------------+
|    Item_Description|sum(Total_Bill_Amount)|
+--------------------+----------------------+
|          CAPPUCCINO|               1559.25|
|      LEMON ICED TEA|               1472.65|
|RED BULL ENERGY D...|                773.45|
|MINERAL WATER(100...|     742.5600000000001|
|         CAFFE LATTE|                519.78|
+--------------------+----------------------+
only showing top 5 rows


In [121]:
#Grouping the records by bill number, Item_Descriptio and calculating sum of total bill amount then sort them in decending order
df_max_bill_number_item_description = df_ocean_nonull.groupBy("bill number","Item_Description").sum("Total_Bill_Amount").sort("sum(Total_Bill_Amount)",ascending=False)




In [141]:
df_max_bill_number_item_description.show(5)

+-----------+--------------------+----------------------+
|bill number|    Item_Description|sum(Total_Bill_Amount)|
+-----------+--------------------+----------------------+
|   G0470260|      LEMON ICED TEA|                315.56|
|   G0470122|      LEMON ICED TEA|                210.38|
|   G0470173|      LEMON ICED TEA|                210.38|
|   G0470187| OCEAN SPECIAL CREAM|                179.44|
|   G0470221|RED BULL ENERGY D...|                154.69|
+-----------+--------------------+----------------------+
only showing top 5 rows


In [146]:
 #transforming Spark Dataframes back to Glue DynamicFrames
transform_df1 = DynamicFrame.fromDF(df_max_bill_number, glueContext, "transform_df1")
transform_df2 = DynamicFrame.fromDF(df_max_item_description, glueContext, "transform_df2")
transform_df3 = DynamicFrame.fromDF(df_max_bill_number_item_description, glueContext, "transform_df3")




# Load

#### Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [152]:
# Storing the data on s3 specified path in parquet format
sink1 = glueContext.write_dynamic_frame.from_options(frame = transform_df1, connection_type = "s3", connection_options = {"path": 's3://ocean-data-output/ocean-Transformed-data/Amount_by_billnumber'}, format = "parquet", transformation_ctx="datasink1")
sink2 = glueContext.write_dynamic_frame.from_options(frame = transform_df2, connection_type = "s3", connection_options = {"path": 's3://ocean-data-output/ocean-Transformed-data/Amount_by_item_description'}, format = "parquet", transformation_ctx="datasink2") 
sink3 = glueContext.write_dynamic_frame.from_options(frame = transform_df3, connection_type = "s3", connection_options = {"path": 's3://ocean-data-output/ocean-Transformed-data/Amount_by_billnumber_and_item_description'}, format = "parquet", transformation_ctx="datasink3")

job.commit()
        


