# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: a095e8ac-4c5c-41a6-a116-624842dd0bdb
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session a095e8ac-4c5c-41a6-a116-624842dd0bdb to get into ready status...
Session a095e8ac-4c5c-41a6-a116-624842dd0bdb has been created.



#### Create a DynamicFrame from a table in the AWS Glue Data Catalog and display the data


In [5]:
dyf_customers = glueContext.create_dynamic_frame.from_catalog(database='pyspark_db', table_name='customers')
dyf_customers.show(10)

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}


#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [6]:
df_customers = dyf_customers.toDF()
df_customers.show(10)

+----------+---------+-----------+----------------+
|customerid|firstname|   lastname|        fullname|
+----------+---------+-----------+----------------+
|       293|Catherine|       Abel|  Catherine Abel|
|       295|      Kim|Abercrombie| Kim Abercrombie|
|       297| Humberto|    Acevedo|Humberto Acevedo|
|       291|  Gustavo|     Achong|  Gustavo Achong|
|       299|    Pilar|   Ackerman|  Pilar Ackerman|
|       305|    Carla|      Adams|     Carla Adams|
|       301|  Frances|      Adams|   Frances Adams|
|       307|      Jay|      Adams|       Jay Adams|
|       309|   Ronald|      Adina|    Ronald Adina|
|       311|   Samuel|   Agcaoili| Samuel Agcaoili|
+----------+---------+-----------+----------------+
only showing top 10 rows


### Data transformation using DynamicFrame methods and APIs (PySpark with Glue wrapper) 


In [7]:
# Check data types in dynamic frame
dyf_customers.printSchema()

root
|-- customerid: long
|-- firstname: string
|-- lastname: string
|-- fullname: string


In [8]:
# Count The Number of Rows in a Dynamic Dataframe 
dyf_customers.count()

635


In [9]:
# Selecting certain fields from a Dynamic DataFrame
dyfCustomerSelectFields = dyf_customers.select_fields(["customerid", "fullname"])

# Show top 10
dyfCustomerSelectFields.show(10)

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}


In [10]:
#Drop Fields of Dynamic Frame
dyfCustomerDropFields = dyf_customers.drop_fields(["firstname","lastname"])

# Show Top 10 rows of dyfCustomerDropFields Dynamic Frame
dyfCustomerDropFields.show(10)

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}


In [12]:
# Rename fields name
newDyF = dyf_customers.rename_field("lastname", "surname")
newDyF.show(10)

{"customerid": 293, "firstname": "Catherine", "fullname": "Catherine Abel", "surname": "Abel"}
{"customerid": 295, "firstname": "Kim", "fullname": "Kim Abercrombie", "surname": "Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "fullname": "Humberto Acevedo", "surname": "Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "fullname": "Gustavo Achong", "surname": "Achong"}
{"customerid": 299, "firstname": "Pilar", "fullname": "Pilar Ackerman", "surname": "Ackerman"}
{"customerid": 305, "firstname": "Carla", "fullname": "Carla Adams", "surname": "Adams"}
{"customerid": 301, "firstname": "Frances", "fullname": "Frances Adams", "surname": "Adams"}
{"customerid": 307, "firstname": "Jay", "fullname": "Jay Adams", "surname": "Adams"}
{"customerid": 309, "firstname": "Ronald", "fullname": "Ronald Adina", "surname": "Adina"}
{"customerid": 311, "firstname": "Samuel", "fullname": "Samuel Agcaoili", "surname": "Agcaoili"}


In [11]:
# Mapping array for column rename fullname -> name
mapping=[("customerid", "long", "customerid","long"),("fullname", "string", "name", "string")]

# Apply the mapping to rename fullname -> name
dfyMapping = ApplyMapping.apply(
                                frame = dyfCustomerDropFields, 
                                mappings = mapping, 
                                transformation_ctx = "applymapping1"
                                )

# show the new dynamic frame with name column 
dfyMapping.show(10)

{"customerid": 293, "name": "Catherine Abel"}
{"customerid": 295, "name": "Kim Abercrombie"}
{"customerid": 297, "name": "Humberto Acevedo"}
{"customerid": 291, "name": "Gustavo Achong"}
{"customerid": 299, "name": "Pilar Ackerman"}
{"customerid": 305, "name": "Carla Adams"}
{"customerid": 301, "name": "Frances Adams"}
{"customerid": 307, "name": "Jay Adams"}
{"customerid": 309, "name": "Ronald Adina"}
{"customerid": 311, "name": "Samuel Agcaoili"}


In [13]:
# Filter dynamicFrameCustomers for customers who have the last name Adams
dyfFilter=  Filter.apply(frame = dyf_customers, 
                                        f = lambda x: x["lastname"] in "Adams"
                                    )

# Show the top 10 customers  from the filtered Dynamic frame 
dyfFilter.show(10)

{"lastname": "Adams", "firstname": "Carla", "customerid": 305, "fullname": "Carla Adams"}
{"lastname": "Adams", "firstname": "Frances", "customerid": 301, "fullname": "Frances Adams"}
{"lastname": "Adams", "firstname": "Jay", "customerid": 307, "fullname": "Jay Adams"}


#### Joining tables in DynamicFrame on a equality join

In [14]:
# First we need to read the second table into a dynamicframe
dyf_orders = glueContext.create_dynamic_frame.from_catalog(database='pyspark_db', table_name='orders')
dyf_customers.show(10)

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}


In [15]:
# Now we join the two dynamicframes 
dyfjoin = dyf_customers.join(["customerid"],["customerid"],dyf_orders)

# show top 10 rows for the joined dynamic 
dyfjoin.show(10)

{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1628, "productid": 754, "linetotal": 874.794, "employeeid": 277, ".customerid": 671, "taxamt": 579.2061, "salesorderid": 44097, "duedate": "8/13/2011", "orderqty": 1, "shipdate": "8/8/2011", "lastname": "Chapla", "firstname": "Lee", "totaldue": 6796.0326, "unitprice": 874.794, "orderdate": "8/1/2011", "unitpricediscount": 0.0, "customerid": 671, "fullname": "Lee Chapla"}
{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1629, "productid": 760, "linetotal": 419.4589, "employeeid": 277, ".customerid": 671, "taxamt": 579.2061, "salesorderid": 44097, "duedate": "8/13/2011", "orderqty": 1, "shipdate": "8/8/2011", "lastname": "Chapla", "firstname": "Lee", "totaldue": 6796.0326, "unitprice": 419.4589, "orderdate": "8/1/2011", "unitpricediscount": 0.0, "customerid": 671, "fullname": "Lee Chapla"}
{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1630, "productid": 762, "linetotal": 838.9178, "e

#### Write Down Data from a Dynamic Frame To S3

In [16]:
# write down the data in a Dynamic Frame to S3 location. 
glueContext.write_dynamic_frame.from_options(
                        frame = newDyF,
                        connection_type="s3", 
                        connection_options = {"path": "s3://{bucket_name}/{folder}"}, 
                        format = "csv", 
                        format_options={
                            "separator": ","
                            },
                        transformation_ctx = "datasink2")

<awsglue.dynamicframe.DynamicFrame object at 0x7fed6b40cf50>
