# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [3]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session baf66c07-49c3-4048-b8f9-ecf754730468.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session baf66c07-49c3-4048-b8f9-ecf754730468.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session baf66c07-49c3-4048-b8f9-ecf754730468.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session baf66c07-49c3-4048-b8f9-ecf754730468.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [5]:
# Read from the customers table in the glue data catalog using a dynamic frame
dynamicFrameCustomers = glueContext.create_dynamic_frame.from_catalog(
database = "mygludatabse",
table_name = "customers"
)
# Show the top 10 rows from the dynamic dataframe
dynamicFrameCustomers.show(10)

{"col0": ""}
{"col0": "Customerid", "col1": "Firstname", "col2": "Lastname", "col3": "Fullname"}
{"col0": "293", "col1": "Catherine", "col2": "Abel", "col3": "Catherine Abel"}
{"col0": "295", "col1": "Kim", "col2": "Abercrombie", "col3": "Kim Abercrombie"}
{"col0": "297", "col1": "Humberto", "col2": "Acevedo", "col3": "Humberto Acevedo"}
{"col0": "291", "col1": "Gustavo", "col2": "Achong", "col3": "Gustavo Achong"}
{"col0": "299", "col1": "Pilar", "col2": "Ackerman", "col3": "Pilar Ackerman"}
{"col0": "305", "col1": "Carla", "col2": "Adams", "col3": "Carla Adams"}
{"col0": "301", "col1": "Frances", "col2": "Adams", "col3": "Frances Adams"}
{"col0": "307", "col1": "Jay", "col2": "Adams", "col3": "Jay Adams"}


In [6]:
# Check types in dynamic frame
dynamicFrameCustomers.printSchema()

root
|-- col0: string
|-- col1: string
|-- col2: string
|-- col3: string


In [7]:
# Count The Number of Rows in a Dynamic Dataframe
dynamicFrameCustomers.count()

637


In [8]:
# Selecting certain fields from a Dynamic DataFrame
dyfCustomerSelectFields = dynamicFrameCustomers.select_fields(["customerid",
"fullname"])
# Show top 10
dyfCustomerSelectFields.show(10)


{}
{}
{}
{}
{}
{}
{}
{}
{}
{}


In [10]:
#Drop Fields of Dynamic Frame
dyfCustomerDropFields = dynamicFrameCustomers.drop_fields(["firstname","lastname"])
# Show Top 10 rows of dyfCustomerDropFields Dynamic Frame
dyfCustomerDropFields.show(10)


{"col0": ""}
{"col0": "Customerid", "col1": "Firstname", "col2": "Lastname", "col3": "Fullname"}
{"col0": "293", "col1": "Catherine", "col2": "Abel", "col3": "Catherine Abel"}
{"col0": "295", "col1": "Kim", "col2": "Abercrombie", "col3": "Kim Abercrombie"}
{"col0": "297", "col1": "Humberto", "col2": "Acevedo", "col3": "Humberto Acevedo"}
{"col0": "291", "col1": "Gustavo", "col2": "Achong", "col3": "Gustavo Achong"}
{"col0": "299", "col1": "Pilar", "col2": "Ackerman", "col3": "Pilar Ackerman"}
{"col0": "305", "col1": "Carla", "col2": "Adams", "col3": "Carla Adams"}
{"col0": "301", "col1": "Frances", "col2": "Adams", "col3": "Frances Adams"}
{"col0": "307", "col1": "Jay", "col2": "Adams", "col3": "Jay Adams"}


In [11]:
# Mapping array for column rename fullname -> name
mapping=[("customerid", "string", "customerid","string"),("fullname", "string",
"name", "string")]
# Apply the mapping to rename fullname -> name
dfyMapping = ApplyMapping.apply(
frame = dyfCustomerDropFields,
mappings = mapping,
transformation_ctx = "applymapping1"
)
# show the new dynamic frame with name column
dfyMapping.show(10)

{}
{}
{}
{}
{}
{}
{}
{}
{}
{}


In [12]:
# Filter dynamicFrameCustomers for customers who have the last name Adams
dyfFilter= Filter.apply(frame = dynamicFrameCustomers,
f = lambda x: x["lastname"] in "Adams"
)
# Show the top 10 customers from the filtered Dynamic frame
dyfFilter.show(10)




In [14]:
# Read from the customers table in the glue data catalog using a dynamic frame
dynamicFrameOrders = glueContext.create_dynamic_frame.from_catalog(
database = "mygludatabse",
table_name = "orders"
)
# show top 10 rows of orders table
dynamicFrameOrders.show(10)


{"col0": "SalesOrderID", "col1": "SalesOrderDetailID", "col2": "OrderDate", "col3": "DueDate", "col4": "ShipDate", "col5": "EmployeeID", "col6": "CustomerID", "col7": "SubTotal", "col8": "TaxAmt", "col9": "Freight", "col10": "TotalDue", "col11": "ProductID", "col12": "OrderQty", "col13": "UnitPrice", "col14": "UnitPriceDiscount", "col15": "LineTotal"}
{"col0": "43659", "col1": "1", "col2": "5/31/2011", "col3": "6/12/2011", "col4": "6/7/2011", "col5": "279", "col6": "1045", "col7": "20565.6206", "col8": "1971.5149", "col9": "616.0984", "col10": "23153.2339", "col11": "776", "col12": "1", "col13": "2024.994", "col14": "0", "col15": "2024.994"}
{"col0": "43659", "col1": "2", "col2": "5/31/2011", "col3": "6/12/2011", "col4": "6/7/2011", "col5": "279", "col6": "1045", "col7": "20565.6206", "col8": "1971.5149", "col9": "616.0984", "col10": "23153.2339", "col11": "777", "col12": "3", "col13": "2024.994", "col14": "0", "col15": "6074.982"}
{"col0": "43659", "col1": "3", "col2": "5/31/2011", "c

In [15]:
# Join two dynamic frames on an equality join
dyfjoin = dynamicFrameCustomers.join(["customerid"],["customerid"],dynamicFrameOrders)
# show top 10 rows for the joined dynamic
dyfjoin.show(10)

{"col4": "ShipDate", "col13": "UnitPrice", "col7": "SubTotal", "col10": "TotalDue", "col1": "SalesOrderDetailID", "col9": "Freight", "col3": "DueDate", "col12": "OrderQty", "col6": "CustomerID", "col15": "LineTotal", "col0": "SalesOrderID", ".col0": "", "col14": "UnitPriceDiscount", "col8": "TaxAmt", "col2": "OrderDate", "col11": "ProductID", "col5": "EmployeeID"}
{"col4": "6/7/2011", "col13": "2024.994", "col7": "20565.6206", "col10": "23153.2339", "col1": "1", "col9": "616.0984", "col3": "6/12/2011", "col12": "1", "col6": "1045", "col15": "2024.994", "col0": "43659", ".col0": "", "col14": "0", "col8": "1971.5149", "col2": "5/31/2011", "col11": "776", "col5": "279"}
{"col4": "6/7/2011", "col13": "2024.994", "col7": "20565.6206", "col10": "23153.2339", "col1": "2", "col9": "616.0984", "col3": "6/12/2011", "col12": "3", "col6": "1045", "col15": "6074.982", "col0": "43659", ".col0": "", "col14": "0", "col8": "1971.5149", "col2": "5/31/2011", "col11": "777", "col5": "279"}
{"col4": "6/7/2

In [16]:
# Write down the data in a Dynamic Frame to S3 location
output_path = "s3://myawsgluebkt/ write_down_dyf_to_s3/"

glueContext.write_dynamic_frame.from_options(
    frame=dynamicFrameCustomers,
    connection_type="s3",
    connection_options={"path": output_path},
    format="csv",
    format_options={"separator": ","},
    transformation_ctx="datasink2"
)


<awsglue.dynamicframe.DynamicFrame object at 0x7feaa79fb3a0>


In [18]:
# write data from the dynamicFrameCustomers to customers_write_dyf table usingthe meta data stored in the glue data catalog
glueContext.write_dynamic_frame.from_catalog(
frame=dynamicFrameCustomers,
database = "mygludatabse",
table_name = "customers_write_dyf"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7feaa79f8760>


In [19]:
# Dynamic Frame to Spark DataFrame
sparkDf = dynamicFrameCustomers.toDF()
#show spark DF
sparkDf.show()

+----------+---------+-----------+----------------+
|      col0|     col1|       col2|            col3|
+----------+---------+-----------+----------------+
|          |     null|       null|            null|
|Customerid|Firstname|   Lastname|        Fullname|
|       293|Catherine|       Abel|  Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|
|       301|  Frances|      Adams|   Frances Adams|
|       307|      Jay|      Adams|       Jay Adams|
|       309|   Ronald|      Adina|    Ronald Adina|
|       311|   Samuel|   Agcaoili| Samuel Agcaoili|
|       313|    James|    Aguilar|   James Aguilar|
|       315|   Robert|   Ahlering| Robert Ahlering|
|       319|      Kim|      Akers|       Kim Akers|
|       441|  Stanley|       Alan|    Stanley Alan|
|       323|

In [22]:
# Select columns from spark dataframe
dfSelect = sparkDf.select("col0","col3")
# show selected
dfSelect.show()

+----------+----------------+
|      col0|            col3|
+----------+----------------+
|          |            null|
|Customerid|        Fullname|
|       293|  Catherine Abel|
|       295| Kim Abercrombie|
|       297|Humberto Acevedo|
|       291|  Gustavo Achong|
|       299|  Pilar Ackerman|
|       305|     Carla Adams|
|       301|   Frances Adams|
|       307|       Jay Adams|
|       309|    Ronald Adina|
|       311| Samuel Agcaoili|
|       313|   James Aguilar|
|       315| Robert Ahlering|
|       319|       Kim Akers|
|       441|    Stanley Alan|
|       323|     Amy Alberts|
|       325|   Anna Albright|
|       327|   Milton Albury|
|       329|     Paul Alcorn|
+----------+----------------+
only showing top 20 rows


In [23]:
#import lit from sql functions
from pyspark.sql.functions import lit
# Add new column to spark dataframe
dfNewColumn = sparkDf.withColumn("date", lit("2022-07-24"))
# show df with new column
dfNewColumn.show()

+----------+---------+-----------+----------------+----------+
|      col0|     col1|       col2|            col3|      date|
+----------+---------+-----------+----------------+----------+
|          |     null|       null|            null|2022-07-24|
|Customerid|Firstname|   Lastname|        Fullname|2022-07-24|
|       293|Catherine|       Abel|  Catherine Abel|2022-07-24|
|       295|      Kim|Abercrombie| Kim Abercrombie|2022-07-24|
|       297| Humberto|    Acevedo|Humberto Acevedo|2022-07-24|
|       291|  Gustavo|     Achong|  Gustavo Achong|2022-07-24|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|2022-07-24|
|       305|    Carla|      Adams|     Carla Adams|2022-07-24|
|       301|  Frances|      Adams|   Frances Adams|2022-07-24|
|       307|      Jay|      Adams|       Jay Adams|2022-07-24|
|       309|   Ronald|      Adina|    Ronald Adina|2022-07-24|
|       311|   Samuel|   Agcaoili| Samuel Agcaoili|2022-07-24|
|       313|    James|    Aguilar|   James Aguilar|2022

In [26]:
#import concat from functions
from pyspark.sql.functions import concat
# create another full name column
dfNewFullName = sparkDf.withColumn("new_full_name",concat("col1",concat(lit(' '),"col2")))
#show full name column
dfNewFullName.show()

+----------+---------+-----------+----------------+------------------+
|      col0|     col1|       col2|            col3|     new_full_name|
+----------+---------+-----------+----------------+------------------+
|          |     null|       null|            null|              null|
|Customerid|Firstname|   Lastname|        Fullname|Firstname Lastname|
|       293|Catherine|       Abel|  Catherine Abel|    Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|   Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|  Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|    Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|    Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|       Carla Adams|
|       301|  Frances|      Adams|   Frances Adams|     Frances Adams|
|       307|      Jay|      Adams|       Jay Adams|         Jay Adams|
|       309|   Ronald|      Adina|    Ronald Adina|      Ronald Adina|
|     

In [27]:
# Drop column from spark dataframe
dfDropCol = sparkDf.drop("firstname","lastname")
#show dropped column df
dfDropCol.show()

+----------+---------+-----------+----------------+
|      col0|     col1|       col2|            col3|
+----------+---------+-----------+----------------+
|          |     null|       null|            null|
|Customerid|Firstname|   Lastname|        Fullname|
|       293|Catherine|       Abel|  Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|
|       301|  Frances|      Adams|   Frances Adams|
|       307|      Jay|      Adams|       Jay Adams|
|       309|   Ronald|      Adina|    Ronald Adina|
|       311|   Samuel|   Agcaoili| Samuel Agcaoili|
|       313|    James|    Aguilar|   James Aguilar|
|       315|   Robert|   Ahlering| Robert Ahlering|
|       319|      Kim|      Akers|       Kim Akers|
|       441|  Stanley|       Alan|    Stanley Alan|
|       323|

In [28]:
# Rename column in Spark dataframe
dfRenameCol = sparkDf.withColumnRenamed("fullname","full_name")
#show renamed column dataframe
dfRenameCol.show()

+----------+---------+-----------+----------------+
|      col0|     col1|       col2|            col3|
+----------+---------+-----------+----------------+
|          |     null|       null|            null|
|Customerid|Firstname|   Lastname|        Fullname|
|       293|Catherine|       Abel|  Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|
|       301|  Frances|      Adams|   Frances Adams|
|       307|      Jay|      Adams|       Jay Adams|
|       309|   Ronald|      Adina|    Ronald Adina|
|       311|   Samuel|   Agcaoili| Samuel Agcaoili|
|       313|    James|    Aguilar|   James Aguilar|
|       315|   Robert|   Ahlering| Robert Ahlering|
|       319|      Kim|      Akers|       Kim Akers|
|       441|  Stanley|       Alan|    Stanley Alan|
|       323|

In [30]:
# Group by lastname then print counts of lastname and show
sparkDf.groupBy("col2").count().show()


+--------+-----+
|    col2|count|
+--------+-----+
|  Achong|    1|
|  Bailey|    1|
|   Caron|    1|
|   Casts|    1|
|   Curry|    1|
| Desalvo|    1|
| Dockter|    1|
|    Dyck|    1|
|  Farino|    1|
| Fluegel|    1|
|   Ganio|    1|
|   Gimmi|    1|
|Gonzales|    2|
|   Graff|    1|
|   Groth|    1|
|    Hass|    1|
| Hassall|    1|
|Hillmann|    1|
| Hodgson|    1|
|   Ihrig|    1|
+--------+-----+
only showing top 20 rows


In [32]:
# Filter spark DataFrame for customers who have the last name Adams
sparkDf.filter(sparkDf["col2"] == "Adams").show()

+----+-------+-----+-------------+
|col0|   col1| col2|         col3|
+----+-------+-----+-------------+
| 305|  Carla|Adams|  Carla Adams|
| 301|Frances|Adams|Frances Adams|
| 307|    Jay|Adams|    Jay Adams|
+----+-------+-----+-------------+


In [34]:
# Where clause spark DataFrame for customers who have the last name Adams
sparkDf.where("col2 =='Adams'").show()

+----+-------+-----+-------------+
|col0|   col1| col2|         col3|
+----+-------+-----+-------------+
| 305|  Carla|Adams|  Carla Adams|
| 301|Frances|Adams|Frances Adams|
| 307|    Jay|Adams|    Jay Adams|
+----+-------+-----+-------------+


In [37]:
# Read from the customers table in the glue data catalog using a dynamic frame and convert to spark dataframe
dfOrders = glueContext.create_dynamic_frame.from_catalog(
database = "mygludatabse",
table_name = "orders"
).toDF()




In [41]:
# Inner Join Customers Spark DF to Orders Spark DF
sparkDf.join(dfOrders,sparkDf.customerid ==
dfOrders.Customerid,"inner").show(truncate=False)

AttributeError: 'DataFrame' object has no attribute 'customerid'


In [42]:
#Get customers that only have surname Adams
dfAdams = sparkDf.where("lastname =='Adams'")
# inner join on Adams DF and orders
dfAdams.join(dfOrders,dfAdams.customerid ==
dfOrders.customerid,"inner").show()

AnalysisException: Column 'lastname' does not exist. Did you mean one of the following? [col0, col1, col2, col3]; line 1 pos 0;
'Filter ('lastname = Adams)
+- LogicalRDD [col0#0, col1#1, col2#2, col3#3], false



In [43]:
#left join on orders and adams df
dfOrders.join(dfAdams,dfAdams.customerid ==
dfOrders.customerid,"left").show(100)

NameError: name 'dfAdams' is not defined
