In [None]:
"""
-- This notebook is showing how to use AWS Glue notebook, Upload Notebook
-- how to use GLUE SQL to manipulate the data
-- work with Spark Data Frame and Glue Dynamic Frame
"""

In [1]:
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit, concat
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 3ea40ef5-ae30-4180-8518-9416ba038a7f
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 3ea40ef5-ae30-4180-8518-9416ba038a7f to get into ready status...
Session 3ea40ef5-ae30-4180-8518-9416ba038a7f has been created.



In [8]:
s3 = boto3.client('s3')
response = s3.list_buckets()
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

  customer-project-marzieh


# Use Spark DataFrame

In [9]:
s3_path = "s3://customer-project-marzieh/input/customers.csv"
df_customer = spark.read.load(s3_path, format="csv", sep=",", inferSchema="true",header="true")
df_customer.printSchema()
df_customer.show(10)
df_customer.count()

root
 |-- Customerid: integer (nullable = true)
 |-- Firstname: string (nullable = true)
 |--  Lastname: string (nullable = true)
 |-- Fullname: string (nullable = true)


In [21]:
df_customer_select = df_customer.select(df_customer['Customerid'], df_customer['Fullname'])
df_customer_select.show(10,truncate=False)

+----------+----------------+
|Customerid|Fullname        |
+----------+----------------+
|293       |Catherine Abel  |
|295       |Kim Abercrombie |
|297       |Humberto Acevedo|
|291       |Gustavo Achong  |
|299       |Pilar Ackerman  |
|305       |Carla Adams     |
|301       |Frances Adams   |
|307       |Jay Adams       |
|309       |Ronald Adina    |
|311       |Samuel Agcaoili |
+----------+----------------+
only showing top 10 rows


In [28]:
dfRenameCol = df_customer.withColumnRenamed(" Lastname","Lastname")
dfDropCol = dfRenameCol.drop('Firstname','Lastname')
dfDropCol.show(10,truncate=False)

+----------+----------------+
|Customerid|Fullname        |
+----------+----------------+
|293       |Catherine Abel  |
|295       |Kim Abercrombie |
|297       |Humberto Acevedo|
|291       |Gustavo Achong  |
|299       |Pilar Ackerman  |
|305       |Carla Adams     |
|301       |Frances Adams   |
|307       |Jay Adams       |
|309       |Ronald Adina    |
|311       |Samuel Agcaoili |
+----------+----------------+
only showing top 10 rows


In [37]:
dfselectfilter = dfRenameCol.filter((dfRenameCol['Lastname'] == 'Kim') & (dfRenameCol['Customerid'] >= 305))
dfselectfilter.show(10,truncate=False)

+----------+---------+--------+---------+
|Customerid|Firstname|Lastname|Fullname |
+----------+---------+--------+---------+
|1233      |Jim      |Kim     |Jim Kim  |
|1231      |Joe      |Kim     |Joe Kim  |
|1235      |Shane    |Kim     |Shane Kim|
|1237      |Tim      |Kim     |Tim Kim  |
+----------+---------+--------+---------+


In [39]:
df_customer.where(df_customer["Customerid"] <300).show()

+----------+---------+-----------+----------------+
|Customerid|Firstname|   Lastname|        Fullname|
+----------+---------+-----------+----------------+
|       293|Catherine|       Abel|  Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
+----------+---------+-----------+----------------+


In [42]:
dfselectfiltergroupby = dfRenameCol.groupBy("Lastname").sum("Customerid")
dfselectfiltergroupby.show(10,truncate=False)

+--------+---------------+
|Lastname|sum(Customerid)|
+--------+---------------+
|Achong  |291            |
|Bailey  |389            |
|Caron   |637            |
|Casts   |651            |
|Curry   |765            |
|Desalvo |801            |
|Dockter |815            |
|Dyck    |841            |
|Farino  |879            |
|Fluegel |893            |
+--------+---------------+
only showing top 10 rows


In [43]:
dfselectfiltergroupbyorderby = dfselectfiltergroupby.orderBy("sum(Customerid)", ascending=False)
dfselectfiltergroupbyorderby.show(10,truncate=False)

+--------+---------------+
|Lastname|sum(Customerid)|
+--------+---------------+
|Miller  |8844           |
|Johnson |8183           |
|Thompson|5661           |
|Taylor  |5589           |
|Liu     |5336           |
|Li      |5288           |
|Lee     |5180           |
|Kim     |4936           |
|Mitchell|4449           |
|Meyer   |4371           |
+--------+---------------+
only showing top 10 rows


In [46]:
df_customer_select.write.parquet("s3://customer-project-marzieh/output/customerselect")
dfpaquet = spark.read.parquet("s3://customer-project-marzieh/output/customerselect")
dfpaquet.show(10,truncate=False)

+----------+----------------+
|Customerid|Fullname        |
+----------+----------------+
|293       |Catherine Abel  |
|295       |Kim Abercrombie |
|297       |Humberto Acevedo|
|291       |Gustavo Achong  |
|299       |Pilar Ackerman  |
|305       |Carla Adams     |
|301       |Frances Adams   |
|307       |Jay Adams       |
|309       |Ronald Adina    |
|311       |Samuel Agcaoili |
+----------+----------------+
only showing top 10 rows


# Use SQL to manipulate the data

In [50]:
dfpaquet.createOrReplaceTempView('dfpaquetsql')
spark.sql('select * from dfpaquetsql limit 10').show()
spark.sql('select Fullname from dfpaquetsql limit 10').show()
df_groupby = spark.sql('select Fullname , sum(Customerid) as SumCustomerid from dfpaquetsql group by Fullname order by SumCustomerid asc')
df_groupby.show(10,truncate=False)

+----------------+-------------+
|Fullname        |SumCustomerid|
+----------------+-------------+
|Gustavo Achong  |291          |
|Catherine Abel  |293          |
|Kim Abercrombie |295          |
|Humberto Acevedo|297          |
|Pilar Ackerman  |299          |
|Frances Adams   |301          |
|Margaret Smith  |303          |
|Carla Adams     |305          |
|Jay Adams       |307          |
|Ronald Adina    |309          |
+----------------+-------------+
only showing top 10 rows


In [2]:
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+


In [3]:
%%sql
create database if not exists customers_project

++
||
++
++


In [6]:
%%sql

CREATE EXTERNAL TABLE IF NOT EXISTS customers_project.customers_csv
(
Customerid integer, 
Firstname string, 
Lastname string, 
Fullname string
) 
STORED AS TEXTFILE
LOCATION 's3://customer-project-marzieh/inputs/customers/'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n'
TBLPROPERTIES ("skip.header.line.count"="1")

++
||
++
++


In [7]:
%%sql
SELECT * from  customers_project.customers_csv limit 10

+----------+---------+-----------+----------------+
|Customerid|Firstname|   Lastname|        Fullname|
+----------+---------+-----------+----------------+
|      null|Firstname|   Lastname|        Fullname|
|       293|Catherine|       Abel|  Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|
|       301|  Frances|      Adams|   Frances Adams|
|       307|      Jay|      Adams|       Jay Adams|
|       309|   Ronald|      Adina|    Ronald Adina|
+----------+---------+-----------+----------------+


# Use Dynamic Frame

In [8]:
dynamicFrameCustomers = glueContext.create_dynamic_frame.from_catalog(
database = "customers_project", 
table_name = "customers_csv"
)




In [9]:
dynamicFrameCustomers.printSchema()
dynamicFrameCustomers.show(10)
dynamicFrameCustomers.count()

dyfCustomerSelectFields = dynamicFrameCustomers.select_fields(["customerid", "fullname"])
dyfCustomerSelectFields.show(10)

dyfCustomerDropFields = dynamicFrameCustomers.drop_fields(["firstname","lastname"])
dyfCustomerDropFields.show(10)

root
|-- customerid: int
|-- firstname: string
|-- lastname: string
|-- fullname: string

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoi

In [105]:
# Mapping to rename column fullname -> name
dfyMapping = ApplyMapping.apply(
                                frame = dyfCustomerDropFields, 
                                mappings = [("customerid", "long", "customerid","long"),
                                            ("fullname", "string", "name", "string")], 
                                transformation_ctx = "applymapping1"
                                )


dfyMapping.show(10)

{"customerid": 293, "name": "Catherine Abel"}
{"customerid": 295, "name": "Kim Abercrombie"}
{"customerid": 297, "name": "Humberto Acevedo"}
{"customerid": 291, "name": "Gustavo Achong"}
{"customerid": 299, "name": "Pilar Ackerman"}
{"customerid": 305, "name": "Carla Adams"}
{"customerid": 301, "name": "Frances Adams"}
{"customerid": 307, "name": "Jay Adams"}
{"customerid": 309, "name": "Ronald Adina"}
{"customerid": 311, "name": "Samuel Agcaoili"}


In [10]:
dyfFilter=  Filter.apply(frame = dynamicFrameCustomers, f = lambda x: x["lastname"] in "Adams")
dyfFilter.show(10)

{"lastname": "Adams", "firstname": "Carla", "customerid": 305, "fullname": "Carla Adams"}
{"lastname": "Adams", "firstname": "Frances", "customerid": 301, "fullname": "Frances Adams"}
{"lastname": "Adams", "firstname": "Jay", "customerid": 307, "fullname": "Jay Adams"}


In [12]:
dynamicFrameOrders = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://customer-project-marzieh/inputs/orders/"]
    },
    format_options={
        "withHeader": True,
        "separator": ","
    })
dynamicFrameOrders.show(10)

{"SalesOrderID": "43659", "SalesOrderDetailID": "1", "OrderDate": "5/31/2011", "DueDate": "6/12/2011", "ShipDate": "6/7/2011", "EmployeeID": "279", "CustomerID": "1045", "SubTotal": "20565.6206", "TaxAmt": "1971.5149", "Freight": "616.0984", "TotalDue": "23153.2339", "ProductID": "776", "OrderQty": "1", "UnitPrice": "2024.994", "UnitPriceDiscount": "0", "LineTotal": "2024.994"}
{"SalesOrderID": "43659", "SalesOrderDetailID": "2", "OrderDate": "5/31/2011", "DueDate": "6/12/2011", "ShipDate": "6/7/2011", "EmployeeID": "279", "CustomerID": "1045", "SubTotal": "20565.6206", "TaxAmt": "1971.5149", "Freight": "616.0984", "TotalDue": "23153.2339", "ProductID": "777", "OrderQty": "3", "UnitPrice": "2024.994", "UnitPriceDiscount": "0", "LineTotal": "6074.982"}
{"SalesOrderID": "43659", "SalesOrderDetailID": "3", "OrderDate": "5/31/2011", "DueDate": "6/12/2011", "ShipDate": "6/7/2011", "EmployeeID": "279", "CustomerID": "1045", "SubTotal": "20565.6206", "TaxAmt": "1971.5149", "Freight": "616.098

In [13]:
dynamicFrameOrders = glueContext.create_dynamic_frame.from_catalog(
database = "customers_project", 
table_name = "orders"
)

dynamicFrameOrders.show(10)

{"salesorderid": 43659, "salesorderdetailid": 1, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 776, "orderqty": 1, "unitprice": 2024.994, "unitpricediscount": 0.0, "linetotal": 2024.994}
{"salesorderid": 43659, "salesorderdetailid": 2, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 777, "orderqty": 3, "unitprice": 2024.994, "unitpricediscount": 0.0, "linetotal": 6074.982}
{"salesorderid": 43659, "salesorderdetailid": 3, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 778, "orderqty": 1, "

In [37]:
dyfjoin = dynamicFrameCustomers.join("customerid","customerid",dynamicFrameOrders)
dyfjoin.show(10)




In [31]:
# Dynamic Frame to Spark DataFrame 
sparkDf = dynamicFrameCustomers.toDF()
sparkDf.show()

+----------+----------+-----------+--------------------+
|customerid| firstname|   lastname|            fullname|
+----------+----------+-----------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|
|       313|     James|    Aguilar|       James Aguilar|
|       315|    Robert|   Ahlering|     Robert Ahlering|
|       319|       Kim|      Akers|           Kim Akers|
|       441|   Stanley|       Alan|        Stanley Alan|
|       323|       Amy|    Albe

In [32]:
# Add new column to spark dataframe
dfNewColumn = sparkDf.withColumn("date", lit("2022-07-24"))
dfNewColumn.show()



# create another full name column
dfNewFullName = sparkDf.withColumn("new_full_name",concat("firstname",concat(lit(' '),"lastname")))
dfNewFullName.show()

+----------+----------+-----------+--------------------+----------+
|customerid| firstname|   lastname|            fullname|      date|
+----------+----------+-----------+--------------------+----------+
|       293| Catherine|       Abel|      Catherine Abel|2022-07-24|
|       295|       Kim|Abercrombie|     Kim Abercrombie|2022-07-24|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|2022-07-24|
|       291|   Gustavo|     Achong|      Gustavo Achong|2022-07-24|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|2022-07-24|
|       305|     Carla|      Adams|         Carla Adams|2022-07-24|
|       301|   Frances|      Adams|       Frances Adams|2022-07-24|
|       307|       Jay|      Adams|           Jay Adams|2022-07-24|
|       309|    Ronald|      Adina|        Ronald Adina|2022-07-24|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|2022-07-24|
|       313|     James|    Aguilar|       James Aguilar|2022-07-24|
|       315|    Robert|   Ahlering|     Robert A

In [33]:
dfOrders = glueContext.create_dynamic_frame.from_catalog(
                                        database = "customers_project", 
                                        table_name = "orders").toDF()


sparkDf.join(dfOrders,sparkDf.customerid ==  dfOrders.customerid,"inner").show(truncate=False)

+----------+---------+--------+--------------+------------+------------------+---------+---------+--------+----------+----------+----------+---------+--------+----------+---------+--------+---------+-----------------+---------+
|customerid|firstname|lastname|fullname      |salesorderid|salesorderdetailid|orderdate|duedate  |shipdate|employeeid|customerid|subtotal  |taxamt   |freight |totaldue  |productid|orderqty|unitprice|unitpricediscount|linetotal|
+----------+---------+--------+--------------+------------+------------------+---------+---------+--------+----------+----------+----------+---------+--------+----------+---------+--------+---------+-----------------+---------+
|517       |Richard  |Bready  |Richard Bready|43665       |61                |5/31/2011|6/12/2011|6/7/2011|283       |517       |14352.7713|1375.9427|429.9821|16158.6961|711      |2       |20.1865  |0.0              |40.373   |
|517       |Richard  |Bready  |Richard Bready|43665       |62                |5/31/2011|

In [39]:
#Convert from Spark Data Frame to Glue Dynamic Frame
dyfCustomersConvert = DynamicFrame.fromDF(sparkDf, glueContext, "convert")
dyfCustomersConvert.show()

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}
{"customerid": 313, "firstname": "James", "lastname": 

In [2]:
## stop the current session 
%stop_session

Stopping session: 3ea40ef5-ae30-4180-8518-9416ba038a7f
Stopped session.
