
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

## Introduction


In [1]:
#Import libraries, create Spark and Glue context, initiate spark session

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::484830312924:role/Glue-admin-role-for-RS
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: e794d591-e4e7-407f-915f-241a82702176
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session e794d591-e4e7-407f-915f-241a82702176 to get into ready status...
Session e794d591-e4e7-407f-915f-241a82702176 has been created.



## Glue DynamicFrames
### Create DynamicFrame for each of the 3 datasets

In [2]:
# Create DynamicFrame for Customer dataset
customer = glueContext.create_dynamic_frame.from_catalog(
    database = "retail_s3", 
    table_name = "customer")




In [3]:
# Print number of records in the DynamicFrame and print schema
print("Record Count: ", customer.count())
customer.printSchema()

Record Count:  4375
root
|-- customer_id: long
|-- title: string
|-- first_name: string
|-- middle_name: string
|-- last_name: string
|-- company_name: string
|-- email_address: string
|-- phone_no: string
|-- join_date: string


In [4]:
# View records from Customer dynamicframe
customer.show(10)

{"customer_id": 17850, "title": "Mr.", "first_name": "Orlando", "middle_name": "N.", "last_name": "Gee", "company_name": "A Bike Store", "email_address": "orlando0@adventure-works.com", "phone_no": "245-555-0173", "join_date": "8/1/05 0:00"}
{"customer_id": 13047, "title": "Mr.", "first_name": "Keith", "middle_name": "", "last_name": "Harris", "company_name": "Progressive Sports", "email_address": "keith0@adventure-works.com", "phone_no": "170-555-0127", "join_date": "8/1/06 0:00"}
{"customer_id": 12583, "title": "Ms.", "first_name": "Donna", "middle_name": "F.", "last_name": "Carreras", "company_name": "Advanced Bike Components", "email_address": "donna0@adventure-works.com", "phone_no": "279-555-0130", "join_date": "9/1/05 0:00"}
{"customer_id": 13748, "title": "Ms.", "first_name": "Janet", "middle_name": "M.", "last_name": "Gates", "company_name": "Modular Cycle Systems", "email_address": "janet1@adventure-works.com", "phone_no": "710-555-0173", "join_date": "7/1/06 0:00"}
{"custome

In [5]:
# Create DynamicFrame for Sales dataset
sales = glueContext.create_dynamic_frame.from_catalog(
    database = "retail_s3",
    table_name = "sales")




In [6]:
# Print number of records in the DynamicFrame and print schema
print("Record Count: ", sales.count())
sales.printSchema()

Record Count:  541909
root
|-- invoice_no: string
|-- product_id: string
|-- quantity: long
|-- invoice_date: string
|-- unit_price: double
|-- customer_id: long
|-- country: string


In [7]:
# View records from Sales dynamicframe
sales.show(10)

{"invoice_no": "536365", "product_id": "85123A", "quantity": 6, "invoice_date": "12/1/10 8:26", "unit_price": 2.55, "customer_id": 17850, "country": "United Kingdom"}
{"invoice_no": "536365", "product_id": "71053", "quantity": 6, "invoice_date": "12/1/10 8:26", "unit_price": 3.39, "customer_id": 17850, "country": "United Kingdom"}
{"invoice_no": "536365", "product_id": "84406B", "quantity": 8, "invoice_date": "12/1/10 8:26", "unit_price": 2.75, "customer_id": 17850, "country": "United Kingdom"}
{"invoice_no": "536365", "product_id": "84029G", "quantity": 6, "invoice_date": "12/1/10 8:26", "unit_price": 3.39, "customer_id": 17850, "country": "United Kingdom"}
{"invoice_no": "536365", "product_id": "84029E", "quantity": 6, "invoice_date": "12/1/10 8:26", "unit_price": 3.39, "customer_id": 17850, "country": "United Kingdom"}
{"invoice_no": "536365", "product_id": "22752", "quantity": 2, "invoice_date": "12/1/10 8:26", "unit_price": 7.65, "customer_id": 17850, "country": "United Kingdom"}


In [14]:
# Create DynamicFrame for Product dataset
product = glueContext.create_dynamic_frame.from_catalog(
    database = "retail_s3",
    table_name = "product")




In [15]:
# Print number of records in the DynamicFrame and print schema
print("Record Count: ", product.count())
product.printSchema()

Record Count:  3960
root
|-- product_id: string
|-- product_desc: string


In [16]:
# View records from Product dynamicframe
product.show(10)

{"product_id": "product_id", "product_desc": "product_desc"}
{"product_id": "85123A", "product_desc": "WHITE HANGING HEART T-LIGHT HOLDER"}
{"product_id": "71053", "product_desc": "WHITE METAL LANTERN"}
{"product_id": "84406B", "product_desc": "CREAM CUPID HEARTS COAT HANGER"}
{"product_id": "84029G", "product_desc": "KNITTED UNION FLAG HOT WATER BOTTLE"}
{"product_id": "84029E", "product_desc": "RED WOOLLY HOTTIE WHITE HEART."}
{"product_id": "22752", "product_desc": "SET 7 BABUSHKA NESTING BOXES"}
{"product_id": "21730", "product_desc": "GLASS STAR FROSTED T-LIGHT HOLDER"}
{"product_id": "22633", "product_desc": "HAND WARMER UNION JACK"}
{"product_id": "22632", "product_desc": "HAND WARMER RED POLKA DOT"}
{"product_id": "84879", "product_desc": "ASSORTED COLOUR BIRD ORNAMENT"}
{"product_id": "22745", "product_desc": "POPPY'S PLAYHOUSE BEDROOM "}
{"product_id": "22748", "product_desc": "POPPY'S PLAYHOUSE KITCHEN"}
{"product_id": "22749", "product_desc": "FELTCRAFT PRINCESS CHARLOTTE D

### Perform Select, Join, Filter Operations

In [17]:
# Now that we have all the 3 source data files loaded into memory as part of creating the dynamic frame
# Let's join all the 3 dynamic frames to get the combined view
# DynamicFrame provides various classes like filter, join, map, count etc which we can use for our data transformations.
# For more details, please refer to the Glue Developer Guide doc -> https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-toDF

joinedDyF = Join.apply(customer, Join.apply(sales, product, "product_id", "product_id"), "customer_id", "customer_id")




In [18]:
# as the joined dyanmic frame will have wider columns from all the 3 dynamic frames, we will use a DynamicFrame class called "select_fields" to display only a set of selective fileds

selectColumnsDyF = joinedDyF.select_fields(paths=["customer_id", "first_name", "last_name", "product_id", "product_desc", "quantity", "unit_price", "country", "invoice_no"])  




In [19]:
# Let's view the data and count
selectColumnsDyF.show(10)

print("Record count: ", selectColumnsDyF.count())

{"customer_id": 13835, "first_name": "Todd", "last_name": "Logan", "product_id": "21080", "product_desc": "SET/6 RED SPOTTY PAPER CUPS", "quantity": 12, "unit_price": 0.85, "country": "United Kingdom", "invoice_no": "552309"}
{"customer_id": 13835, "first_name": "Todd", "last_name": "Logan", "product_id": "21080", "product_desc": "SET/6 RED SPOTTY PAPER CUPS", "quantity": 12, "unit_price": 0.85, "country": "United Kingdom", "invoice_no": "559326"}
{"customer_id": 13835, "first_name": "Todd", "last_name": "Logan", "product_id": "47590B", "product_desc": "YELLOW METAL CHICKEN HEART ", "quantity": 3, "unit_price": 5.45, "country": "United Kingdom", "invoice_no": "552309"}
{"customer_id": 13835, "first_name": "Todd", "last_name": "Logan", "product_id": "46000S", "product_desc": "BEACH HUT KEY CABINET", "quantity": 4, "unit_price": 1.45, "country": "United Kingdom", "invoice_no": "571657"}
{"customer_id": 13835, "first_name": "Todd", "last_name": "Logan", "product_id": "21888", "product_des

In [20]:
# Now let's look at filtering the data, get the records which have country = 'United Kingdom"
filteredDyF = Filter.apply(frame = selectColumnsDyF,
                           f = lambda x: x["country"] in "United Kingdom"
                          )




In [21]:
# Let's view the data and count
filteredDyF.show(10)

print("Record count: ", filteredDyF.count())

{"product_id": "21086", "first_name": "Todd", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "POLKADOT RAIN HAT ", "invoice_no": "552309", "quantity": 12, "last_name": "Logan"}
{"product_id": "21086", "first_name": "Todd", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "POLKADOT RAIN HAT ", "invoice_no": "559326", "quantity": 12, "last_name": "Logan"}
{"product_id": "22821", "first_name": "Todd", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "ASSORTED COLOURS SILK FAN", "invoice_no": "571657", "quantity": 12, "last_name": "Logan"}
{"product_id": "85048", "first_name": "Todd", "unit_price": 7.95, "country": "United Kingdom", "customer_id": 13835, "product_desc": "CREAM SWEETHEART LETTER RACK", "invoice_no": "571657", "quantity": 2, "last_name": "Logan"}
{"product_id": "85048", "first_name": "Todd", "unit_price": 7.95, "country": "United Kingdom", "customer_id": 13835,

## Drop Fields, Rename columns

In [18]:
# Drop field invoice_no
dropFieldsDyF = filteredDyF.drop_fields(["invoice_no"])
dropFieldsDyF.show(10)

{"first_name": "Todd", "product_id": "21086", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "POLKADOT RAIN HAT ", "quantity": 12, "last_name": "Logan"}
{"first_name": "Todd", "product_id": "21086", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "POLKADOT RAIN HAT ", "quantity": 12, "last_name": "Logan"}
{"first_name": "Todd", "product_id": "22821", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "ASSORTED COLOURS SILK FAN", "quantity": 12, "last_name": "Logan"}
{"first_name": "Todd", "product_id": "85048", "unit_price": 7.95, "country": "United Kingdom", "customer_id": 13835, "product_desc": "CREAM SWEETHEART LETTER RACK", "quantity": 2, "last_name": "Logan"}
{"first_name": "Todd", "product_id": "85048", "unit_price": 7.95, "country": "United Kingdom", "customer_id": 13835, "product_desc": "CREAM SWEETHEART LETTER RACK", "quantity": 2, "last_name": "Logan"}
{"first_na

In [19]:
# Rename column / Change Field name: quantity to qty
dropFieldsDyF.rename_field("quantity", "qty").show(10)

{"product_id": "21086", "first_name": "Todd", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "POLKADOT RAIN HAT ", "qty": 12, "last_name": "Logan"}
{"product_id": "21086", "first_name": "Todd", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "POLKADOT RAIN HAT ", "qty": 12, "last_name": "Logan"}
{"product_id": "22821", "first_name": "Todd", "unit_price": 0.65, "country": "United Kingdom", "customer_id": 13835, "product_desc": "ASSORTED COLOURS SILK FAN", "qty": 12, "last_name": "Logan"}
{"product_id": "85048", "first_name": "Todd", "unit_price": 7.95, "country": "United Kingdom", "customer_id": 13835, "product_desc": "CREAM SWEETHEART LETTER RACK", "qty": 2, "last_name": "Logan"}
{"product_id": "85048", "first_name": "Todd", "unit_price": 7.95, "country": "United Kingdom", "customer_id": 13835, "product_desc": "CREAM SWEETHEART LETTER RACK", "qty": 2, "last_name": "Logan"}
{"product_id": "22355", "first_name

In [20]:
# Another way to remane a column is by applying mapping similar to the Glue Studio -> Actions -> ApplyMappings
# With mapping, we can also change the datatypes
mapping = [("first_name","string","first_name","string"),("last_name","string","last_name","string"),("product_id","string","product_id","string"),("product_desc","string","product_desc","string"),("unit_price","double","unit_price","double"),
           ("customer_id","long","customer_id","long"),("country","string","country","string"),("quantity","long","qty","long")]

mapDyF = ApplyMapping.apply(
    frame = dropFieldsDyF,
    mappings = mapping
)

mapDyF.show(10)

{"first_name": "Todd", "last_name": "Logan", "product_id": "21086", "product_desc": "POLKADOT RAIN HAT ", "unit_price": 0.65, "customer_id": 13835, "country": "United Kingdom", "qty": 12}
{"first_name": "Todd", "last_name": "Logan", "product_id": "21086", "product_desc": "POLKADOT RAIN HAT ", "unit_price": 0.65, "customer_id": 13835, "country": "United Kingdom", "qty": 12}
{"first_name": "Todd", "last_name": "Logan", "product_id": "22821", "product_desc": "ASSORTED COLOURS SILK FAN", "unit_price": 0.65, "customer_id": 13835, "country": "United Kingdom", "qty": 12}
{"first_name": "Todd", "last_name": "Logan", "product_id": "85048", "product_desc": "CREAM SWEETHEART LETTER RACK", "unit_price": 7.95, "customer_id": 13835, "country": "United Kingdom", "qty": 2}
{"first_name": "Todd", "last_name": "Logan", "product_id": "85048", "product_desc": "CREAM SWEETHEART LETTER RACK", "unit_price": 7.95, "customer_id": 13835, "country": "United Kingdom", "qty": 2}
{"first_name": "Todd", "last_name":

## Persist data to S3

In [21]:
# Write data to S3
glueContext.write_dynamic_frame.from_options(
    frame = mapDyF,
    connection_type = "s3",
    connection_options = {"path":"s3://redshift-spark-integration/data/tgt/"},
    format = "parquet"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f0b2b6dced0>


## Spark DataFrame
As the name suggestes, Glue DynamicFrames are truely dynamic in that you can convert a DynamicFrame to an Apache DataFrame converting DynamicRecords into DataFrame rows and Viceversa.<br>
For any given ETL job, You an take advantage of the DynamicFrame which is memory optimized, i.e, "The executor memory with AWS Glue dynamicFrame never exceeds the safe threshold, and will not hit the "Out of memory" issues. <br>
A DynamicRecord represents a logical record in a DynamicFrame which is self-describing and can be used for data that does not conform to a fixed schema.<br>
With these and many more advantages that the DynamicFrame provides, you have the full flexibility to convert the DynamicFrames to DataFrames when you want to use the DataFrame functionality and convert it back to DynamicFrame and continue with your ETL script until you write the data to one of the supported targets.

In [22]:
# Now Let's conver the DynamicFrame to a DataFrame
salesDF = selectColumnsDyF.toDF()




In [26]:
# Print Schema of the Spark DataFrame
salesDF.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_desc: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- country: string (nullable = true)
 |-- invoice_no: string (nullable = true)


In [23]:
# Select columns from the DataFrame, unlike Glue DynamicFrame, the output of a DataFrame is in tabular format
salesDF.select("customer_id","product_id","country").show()

+-----------+----------+--------------+
|customer_id|product_id|       country|
+-----------+----------+--------------+
|      13835|     22189|United Kingdom|
|      13835|     22772|United Kingdom|
|      13835|     22464|United Kingdom|
|      13835|     22666|United Kingdom|
|      13835|    51014A|United Kingdom|
|      13835|     23399|United Kingdom|
|      13835|     23407|United Kingdom|
|      13835|    72807C|United Kingdom|
|      13835|     23487|United Kingdom|
|      13835|    47591D|United Kingdom|
|      13835|     22505|United Kingdom|
|      13835|     22596|United Kingdom|
|      13835|     22307|United Kingdom|
|      13835|     21155|United Kingdom|
|      13835|     21086|United Kingdom|
|      13835|     21086|United Kingdom|
|      13835|     22998|United Kingdom|
|      13835|    47590A|United Kingdom|
|      13835|     22821|United Kingdom|
|      13835|     85048|United Kingdom|
+-----------+----------+--------------+
only showing top 20 rows


## Add, Drop and Rename column

In [24]:
# Let's try adding, dropping, renaming columns and also perform some aggregate functions on the dataframe
# We need to import additional libraries

from pyspark.sql.functions import col




In [25]:
# Add a new derived column to the DataFrame, let's call the new dervied column as "total_sales" which is derived by multipying quantity with unit_price for each transaction

DerivedColDF = salesDF.withColumn("total_sales", col("quantity").cast("Integer") * col("unit_price").cast("Float"))
DerivedColDF.show(10)

+-----------+----------+---------+----------+--------------------+--------+----------+--------------+----------+-----------+
|customer_id|first_name|last_name|product_id|        product_desc|quantity|unit_price|       country|invoice_no|total_sales|
+-----------+----------+---------+----------+--------------------+--------+----------+--------------+----------+-----------+
|      13835|      Todd|    Logan|     21080|SET/6 RED SPOTTY ...|      12|      0.85|United Kingdom|    552309|  10.200001|
|      13835|      Todd|    Logan|     21080|SET/6 RED SPOTTY ...|      12|      0.85|United Kingdom|    559326|  10.200001|
|      13835|      Todd|    Logan|    47590B|YELLOW METAL CHIC...|       3|      5.45|United Kingdom|    552309|  16.349998|
|      13835|      Todd|    Logan|    46000S|BEACH HUT KEY CAB...|       4|      1.45|United Kingdom|    571657|        5.8|
|      13835|      Todd|    Logan|     21888|           BINGO SET|       4|      3.75|United Kingdom|    552309|       15.0|


In [38]:
#drop invoice number column from the DataFrame, am not storing this to a new variable, rather just showing how its done with Spark DataFrame
DerivedColDF.drop("invoice_no").show(10)

+-----------+----------+---------+----------+--------------------+--------+----------+--------------+-----------+
|customer_id|first_name|last_name|product_id|        product_desc|quantity|unit_price|       country|total_sales|
+-----------+----------+---------+----------+--------------------+--------+----------+--------------+-----------+
|      13835|      Todd|    Logan|     21080|SET/6 RED SPOTTY ...|      12|      0.85|United Kingdom|  10.200001|
|      13835|      Todd|    Logan|     21080|SET/6 RED SPOTTY ...|      12|      0.85|United Kingdom|  10.200001|
|      13835|      Todd|    Logan|    47590B|YELLOW METAL CHIC...|       3|      5.45|United Kingdom|  16.349998|
|      13835|      Todd|    Logan|    46000S|BEACH HUT KEY CAB...|       4|      1.45|United Kingdom|        5.8|
|      13835|      Todd|    Logan|     21888|           BINGO SET|       4|      3.75|United Kingdom|       15.0|
|      13835|      Todd|    Logan|     21936|KNITTED RABBIT DOLL |       5|      2.95|Un

In [40]:
# Rename an existing column to a new name, am not storing this to a new variable, rather just showing how its done with Spark DataFrame
DerivedColDF.withColumnRenamed("quantity","qty").show(10)

+-----------+----------+---------+----------+--------------------+---+----------+--------------+----------+-----------+
|customer_id|first_name|last_name|product_id|        product_desc|qty|unit_price|       country|invoice_no|total_sales|
+-----------+----------+---------+----------+--------------------+---+----------+--------------+----------+-----------+
|      13835|      Todd|    Logan|     21086|  POLKADOT RAIN HAT | 12|      0.65|United Kingdom|    552309|  7.7999997|
|      13835|      Todd|    Logan|     21086|  POLKADOT RAIN HAT | 12|      0.65|United Kingdom|    559326|  7.7999997|
|      13835|      Todd|    Logan|     22821|ASSORTED COLOURS ...| 12|      0.65|United Kingdom|    571657|  7.7999997|
|      13835|      Todd|    Logan|     85048|CREAM SWEETHEART ...|  2|      7.95|United Kingdom|    571657|       15.9|
|      13835|      Todd|    Logan|     85048|CREAM SWEETHEART ...|  2|      7.95|United Kingdom|    576346|       15.9|
|      13835|      Todd|    Logan|     2

## Aggregation

In [26]:
# Spark provides powerful functions using which we can efficiently perform complex data analytics like aggregations, joins etc
# With the sales DataFrame we have, we will perform data analysis by using aggregate function to see total sales by Customer, total sales by Customer by Product, total revenue by country.

#Total sales by Customer, renamed aggregate column to a new name 
totalSalesbyCustDF = DerivedColDF.groupBy("customer_id").sum("total_sales").withColumnRenamed("sum(total_sales)", "sum_sales")
totalSalesbyCustDF.show(10)

+-----------+------------------+
|customer_id|         sum_sales|
+-----------+------------------+
|      13638|122.63999938964844|
|      13263| 7601.319960206747|
|      17979| 737.8100007176399|
|      13715|1088.2400034666061|
|      12725| 402.3000020980835|
|      14705|             179.0|
|      13098|29148.479726672173|
|      18147|179.33999729156494|
|      17102|              25.5|
|      14808|2175.3700006604195|
+-----------+------------------+
only showing top 10 rows


In [27]:
# Total sales by Customer by Product

totalSalesbyCustProdDF = DerivedColDF.groupBy("customer_id", "product_id").sum("total_sales").withColumnRenamed("sum(total_sales)", "sum_sales")
totalSalesbyCustProdDF.show(10)

+-----------+----------+------------------+
|customer_id|product_id|         sum_sales|
+-----------+----------+------------------+
|      13835|    47590B|16.349998474121094|
|      13835|     23487| 9.949999809265137|
|      15400|     21878|  71.4000015258789|
|      16805|    85049E|              1.25|
|      16805|     22621|1.4500000476837158|
|      16805|     21889|              1.25|
|      14825|     20768| 2.549999952316284|
|      14825|     22219| 6.800000190734863|
|      14825|     22296|19.799999237060547|
|      13934|     22722|15.800000190734863|
+-----------+----------+------------------+
only showing top 10 rows


In [28]:
# Total sales by Country

totalSalesbyCountryDF = DerivedColDF.groupBy("country").sum("total_sales").withColumnRenamed("sum(total_sales)", "sum_sales")
totalSalesbyCountryDF.show(10)

+------------------+------------------+
|           country|         sum_sales|
+------------------+------------------+
|              EIRE|266116.73906064034|
|         Lithuania|1661.0599565505981|
|European Community|1240.7500128746033|
|            Norway| 35163.45992398262|
|             Spain|  54926.2302082628|
|           Belgium| 40893.08015060425|
|             Japan| 35340.61988157034|
|            Sweden|36595.909824848175|
|           Denmark| 18768.14003944397|
|           Finland|22397.540030241013|
+------------------+------------------+
only showing top 10 rows


## Filter Data

In [72]:
# Now let's filter the data from DataFrame, this is much straight forward to use where clause as with teh traditional SQL

#Filter data by country = "United Kingdom"
filteredDF = DerivedColDF.where("country == 'United Kingdom'")
filteredDF.show(10)

+-----------+----------+---------+----------+--------------------+--------+----------+--------------+----------+-----------+
|customer_id|first_name|last_name|product_id|        product_desc|quantity|unit_price|       country|invoice_no|total_sales|
+-----------+----------+---------+----------+--------------------+--------+----------+--------------+----------+-----------+
|      13835|      Todd|    Logan|     21080|SET/6 RED SPOTTY ...|      12|      0.85|United Kingdom|    552309|  10.200001|
|      13835|      Todd|    Logan|     21080|SET/6 RED SPOTTY ...|      12|      0.85|United Kingdom|    559326|  10.200001|
|      13835|      Todd|    Logan|    47590B|YELLOW METAL CHIC...|       3|      5.45|United Kingdom|    552309|  16.349998|
|      13835|      Todd|    Logan|    46000S|BEACH HUT KEY CAB...|       4|      1.45|United Kingdom|    571657|        5.8|
|      13835|      Todd|    Logan|     21888|           BINGO SET|       4|      3.75|United Kingdom|    552309|       15.0|


## Persist Data

In [74]:
# First option to persist data to S3

# Write dataframe directly to S3 as a Parquet file

DerivedColDF.write.parquet("s3://redshift-spark-integration/data/tgt/data")




In [31]:
# For simplicity and to write to Glue Data Catalog, we can convert the Spark DataFrame back to Glue DynamicFrame and write to either S3 directly or to Catalog

# Conver back to DynamicFrame
from awsglue.dynamicframe import DynamicFrame

DerivedColDyF = DynamicFrame.fromDF(DerivedColDF,glueContext, "convert")




In [80]:
# Write data to S3 using DynamicFrame

glueContext.write_dynamic_frame_from_options(
    frame=DerivedColDyF,
    connection_type = "s3",
    connection_options = {"path":"s3://redshift-spark-integration/data/tgt/"},
    format = "parquet")

<awsglue.dynamicframe.DynamicFrame object at 0x7f0b2b75ec50>


In [33]:
glueContext.write_dynamic_frame.from_catalog(
    frame=DerivedColDyF,
    database = "retail_s3",
    table_name = "tgt")

<awsglue.dynamicframe.DynamicFrame object at 0x7f7f5053b1d0>
