## Set up envinronment

In [1]:
import sys
import re
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import input_file_name, split, lit, concat, col, coalesce, expr ,year, month, dayofmonth,regexp_extract,current_timestamp, lower
from pyspark.sql.types import StructType, StructField, StringType,LongType
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql import SparkSession


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 81467060-28b6-4605-adf7-9e0100b58278
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session 81467060-28b6-4605-adf7-9e0100b58278 to get into ready status...
Session 81467060-28b6-4605-adf7-9e0100b58278 has been created.



In [18]:
sc = SparkContext.getOrCreate()




## Count rows compare between raw file and bronze layer

In [19]:
#Bronze layer - Book data
book_data = glueContext.create_dynamic_frame.from_catalog(database="brnz", table_name= "book_data")
book_data.count()

212404


In [20]:
#Raw file
book_data_sri = glueContext.create_dynamic_frame.from_catalog(database="sri", table_name= "book_data")
book_data_sri.count()

212404


In [21]:
#Bronze layer - Book Rating
book_rating = glueContext.create_dynamic_frame.from_catalog(database="brnz", table_name= "book_rating")
book_rating.count()

3000000


In [22]:
#Raw file
book_rating_sri = glueContext.create_dynamic_frame.from_catalog(database="sri", table_name= "book_rating")
book_rating_sri.count()

3000000


In [30]:
#Bronze layer - Liquor data
liquor_data = glueContext.create_dynamic_frame.from_catalog(database="brnz", table_name= "liquor_data")
liquor_data.count()

19666763


In [23]:
#Raw file - Liquor data
liquor_data = glueContext.create_dynamic_frame.from_catalog(database="sri", table_name= "liquor_data")
liquor_data.count()

19666763


In [24]:
#Bronze layer - Sales data
sales_data = glueContext.create_dynamic_frame.from_catalog(database="brnz", table_name= "sales_data")
sales_data.count()

186849


In [25]:
#Raw file - Sales data
sales_data = glueContext.create_dynamic_frame.from_catalog(database="sri", table_name= "salesdata")
sales_data.count()

186849


## Check Null values in gld tables

In [27]:
book_rating = glueContext.create_dynamic_frame.from_catalog(database="gld", table_name= "book_rating")
null_id_count = book_rating.toDF().where(col("id").isNull()).count()
print(null_id_count)

0


In [28]:
sales_trans = glueContext.create_dynamic_frame.from_catalog(database="gld", table_name= "sales_transaction")
null_id_count = sales_trans.toDF().where(col("id").isNull()).count()
print(null_id_count)

0


In [8]:
dim_location = glueContext.create_dynamic_frame.from_catalog(database="gld", table_name= "dim_location")
null_id_count = dim_location.toDF().where(col("zip_code").isNull()).count()
print(null_id_count)

0


In [11]:
liquor_trans = glueContext.create_dynamic_frame.from_catalog(database="gld", table_name= "liquor_transaction")
null_id_count = liquor_trans.toDF().where(col("id").isNull()).count()
print(null_id_count)

0


## Check Sum between brnz and gld tables

In [32]:
from pyspark.sql.functions import sum as spark_sum

sales_trans = glueContext.create_dynamic_frame.from_catalog(database="gld", table_name="sales_transaction")
df = sales_trans.toDF()
sum_quantity = df.select(spark_sum("quantity_ordered")).collect()[0][0]
print(sum_quantity)


209079


In [33]:
sales_trans = glueContext.create_dynamic_frame.from_catalog(database="brnz", table_name="sales_data")
df = sales_trans.toDF()
sum_quantity = df.select(spark_sum("quantity_ordered")).collect()[0][0]
print(sum_quantity)

209079


## Check Duplicates in Dim table

In [34]:
dim_location = glueContext.create_dynamic_frame.from_catalog(database="gld", table_name= "dim_location")
duplicate_zip_count = dim_location.toDF().groupBy("zip_code").count().where(col("count") > 1).count()
print("Duplicate zip_code count:", duplicate_zip_count)

Duplicate zip_code count: 0
