In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5
%connections redshift-demo-connection

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Connections to be included:
redshift-demo-connection
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::262136919150:role/etl-RedshiftIamRole-9GTN9YKVNFXQ
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: ee649392-be44-4d6f-84d2-251fd1c4bf84
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session ee649392-be44-4d6f-84d2-251fd1c4bf84 to get into ready status...
Session ee649392-be44-4d6f-84d2-251fd1c4bf84 has been created.



#### Create a DynamicFrame from a table in the AWS Glue Data Catalog , dropping null records and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='salesdb', table_name='sales_records_csv')
dyf = DropNullFields.apply(frame=dyf)
dyf.printSchema()

null_fields []
root
|-- id: long
|-- region: string
|-- country: string
|-- item_type: string
|-- sales_channel: string
|-- order_priority: string
|-- order_date: string
|-- order_id: long
|-- ship_date: string
|-- units_sold: long
|-- unit_price: double
|-- unit_cost: double
|-- total_revenue: double
|-- total_cost: double
|-- total_profit: double


#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show()

+---+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
| id|              region|             country|      item_type|sales_channel|order_priority|order_date| order_id| ship_date|units_sold|unit_price|unit_cost|total_revenue|total_cost|total_profit|
+---+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  1|Central America a...|Antigua and Barbuda |      Baby Food|       Online|             M|12/20/2013|957081544| 1/11/2014|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|  2|Central America a...|              Panama|         Snacks|      Offline|             C|  7/5/2010|301644504| 7/26/2010|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|  3|              Europe

#### Data transformations


In [6]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
sales_df = df.withColumn("Order_Date", to_date(unix_timestamp(col('order_date'), 'MM/dd/yyyy').cast('timestamp'))) \
             .withColumn("Ship_Date", to_date(unix_timestamp(col('ship_date'), 'MM/dd/yyyy').cast('timestamp')))

sales_df.show(10, True)


+---+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
| id|              region|             country|    item_type|sales_channel|order_priority|Order_Date| order_id| Ship_Date|units_sold|unit_price|unit_cost|total_revenue|total_cost|total_profit|
+---+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  1|Central America a...|Antigua and Barbuda |    Baby Food|       Online|             M|2013-12-20|957081544|2014-01-11|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|  2|Central America a...|              Panama|       Snacks|      Offline|             C|2010-07-05|301644504|2010-07-26|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|  3|              Europe|      Cze

#### Group by Region and Country and calculate aggregate metrics


In [7]:
aggregate_df = sales_df.groupBy("Region", "Country", year("order_date").alias('year'), quarter("order_date").alias('quarter')).agg(
    sum("Total_Revenue").alias("Total_Revenue_By_Region_Country"),
    sum("Total_Cost").alias("Total_Cost_By_Region_Country"),
    sum("Total_Profit").alias("Total_Profit_By_Region_Country")
)






#### Show the aggregated data


In [8]:
aggregate_df.orderBy("year","quarter").show()
aggregate_df.count()

+--------------------+--------------------+----+-------+-------------------------------+----------------------------+------------------------------+
|              Region|             Country|year|quarter|Total_Revenue_By_Region_Country|Total_Cost_By_Region_Country|Total_Profit_By_Region_Country|
+--------------------+--------------------+----+-------+-------------------------------+----------------------------+------------------------------+
|Central America a...|         Saint Lucia|2010|      1|                      304091.94|                   194197.92|                     109894.02|
|       North America|              Mexico|2010|      1|                      538028.59|                   373058.61|                     164969.98|
|Central America a...|         El Salvador|2010|      1|                      1886886.1|                  1074250.03|                     812636.07|
|Central America a...|Saint Kitts and N...|2010|      1|                       245126.7|                  

#### Renaming the cloumns and displaying the content in a sorted manner.

In [10]:
aggregate_df= aggregate_df.withColumnRenamed("Total_Revenue_By_Region_Country","Total_Revenue")\
                          .withColumnRenamed("Total_Cost_By_Region_Country","Total_Cost")\
                          .withColumnRenamed("Total_Profit_By_Region_Country","Total_Profit")
aggregate_df.orderBy("year","quarter").show()

+--------------------+--------------------+----+-------+------------------+------------------+------------------+
|              Region|             Country|year|quarter|     Total_Revenue|        Total_Cost|      Total_Profit|
+--------------------+--------------------+----+-------+------------------+------------------+------------------+
|Central America a...|  Dominican Republic|2010|      1|         307557.81|         265859.01|           41698.8|
|Central America a...|             Jamaica|2010|      1|        1573974.57|        1268828.32|         305146.25|
|       North America|              Mexico|2010|      1|         538028.59|         373058.61|         164969.98|
|              Europe|             Andorra|2010|      1|3348578.1900000004|2008080.1500000001|        1340498.04|
|Middle East and N...|                Iran|2010|      1|        2931671.66|        2239089.38|         692582.28|
|              Europe|              Kosovo|2010|      1|         677247.76|         3997

#### Convert the Spark DataFrame to a DynamicFrame and display a sample of the data

In [11]:
dyf = DynamicFrame.fromDF(aggregate_df, glueContext, "dynamic_frame")




#### Load the dynamic frame into our Amazon Redshift cluster

In [12]:
dyf.show()

{"Region": "Europe", "Country": "Luxembourg", "year": 2010, "quarter": 1, "Total_Revenue": 1123251.46, "Total_Cost": 662970.63, "Total_Profit": 460280.83}
{"Region": "Europe", "Country": "Switzerland", "year": 2014, "quarter": 1, "Total_Revenue": 4429651.8, "Total_Cost": 2873705.61, "Total_Profit": 1555946.19}
{"Region": "Central America and the Caribbean", "Country": "Dominica", "year": 2010, "quarter": 2, "Total_Revenue": 1255966.53, "Total_Cost": 1085682.13, "Total_Profit": 170284.4}
{"Region": "Australia and Oceania", "Country": "Federated States of Micronesia", "year": 2012, "quarter": 2, "Total_Revenue": 5588354.33, "Total_Cost": 4029198.36, "Total_Profit": 1559155.97}
{"Region": "Europe", "Country": "Poland", "year": 2015, "quarter": 4, "Total_Revenue": 5740416.15, "Total_Cost": 4627522.4, "Total_Profit": 1112893.75}
{"Region": "Sub-Saharan Africa", "Country": "Namibia", "year": 2016, "quarter": 1, "Total_Revenue": 1861809.39, "Total_Cost": 1500860.64, "Total_Profit": 360948.75}

In [13]:
redshift_output = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=dyf,
    catalog_connection="redshift-demo-connection",
    connection_options={"dbtable": "public.Regionalsales","database":"dev"},
    redshift_tmp_dir = "s3://aws-glue-assets-262136919150-us-east-1/temporary/",
    transformation_ctx = "redshift_output"
)




