# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: a727c04f-e6bb-483a-ad3f-418e029c271a
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session a727c04f-e6bb-483a-ad3f-418e029c271a to get into ready status...
Session a727c04f-e6bb-483a-ad3f-418e029c271a ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
# Read from the customers table in the glue data catalog using a dynamic frame
dynamicFrameCustomers = glueContext.create_dynamic_frame.from_catalog(
database = "pyspark_tutorial_db", 
table_name = "customers"
)

# Show the top 10 rows from the dynamic dataframe
dynamicFrameCustomers.show(10)

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}


In [3]:
# Print Schema
dynamicFrameCustomers.printSchema()

root
|-- customerid: long
|-- firstname: string
|-- lastname: string
|-- fullname: string


In [4]:
# Count the Dynamic Frame
dynamicFrameCustomers.count()

635


In [6]:
# Selecting customerid and fullname from Dynamic Frame
dyfSelectCustomers = dynamicFrameCustomers.select_fields(["customerid", "fullname"])

# Show the results
dyfSelectCustomers.show(10)

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}


In [7]:
# Selecting and showing all in one line
dynamicFrameCustomers.select_fields(["customerid", "fullname"]).show()

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}
{"customerid": 313, "fullname": "James Aguilar"}
{"customerid": 315, "fullname": "Robert Ahlering"}
{"customerid": 319, "fullname": "Kim Akers"}
{"customerid": 441, "fullname": "Stanley Alan"}
{"customerid": 323, "fullname": "Amy Alberts"}
{"customerid": 325, "fullname": "Anna Albright"}
{"customerid": 327, "fullname": "Milton Albury"}
{"customerid": 329, "fullname": "Paul Alcorn"}
{"customerid": 331, "fullname": "Gregory Alderson"}
{"customerid": 333, "fullname": "J. Phillip Alexander"}


In [8]:
# Drop fields instead of selecting from Dynamic Frame
dyfCustomerDropFields = dynamicFrameCustomers.drop_fields(["firstname", "lastname"])

# We can show results, so now there is no more firstname and lastname
dyfCustomerDropFields.show(10)

{"customerid": 293, "fullname": "Catherine Abel"}
{"customerid": 295, "fullname": "Kim Abercrombie"}
{"customerid": 297, "fullname": "Humberto Acevedo"}
{"customerid": 291, "fullname": "Gustavo Achong"}
{"customerid": 299, "fullname": "Pilar Ackerman"}
{"customerid": 305, "fullname": "Carla Adams"}
{"customerid": 301, "fullname": "Frances Adams"}
{"customerid": 307, "fullname": "Jay Adams"}
{"customerid": 309, "fullname": "Ronald Adina"}
{"customerid": 311, "fullname": "Samuel Agcaoili"}


In [None]:
# We can rename the columns as well inside the Dynamic Frame
# Mapping array for column rename customerid -> customer, fullname -> name
mapping=[("customerid", "long", "customerid","long"),("fullname", "string", "name", "string")]

# Apply the mapping to rename from mapping array
dyfMapping = ApplyMapping.apply(
frame = dyfCustomerDropFields, 
mappings = mapping, 
transformation_ctx = "applymapping1")

# Show the new dynamic frame with name column 
dyfMapping.show(10)

In [9]:
# Filter data in the Dynamic Frame to only those who have the last name Adams
dyfFilter = Filter.apply(
frame = dynamicFrameCustomers,
f = lambda x: x["lastname"] in "Adams")

# Show the results
dyfFilter.show()

{"lastname": "Adams", "firstname": "Carla", "customerid": 305, "fullname": "Carla Adams"}
{"lastname": "Adams", "firstname": "Frances", "customerid": 301, "fullname": "Frances Adams"}
{"lastname": "Adams", "firstname": "Jay", "customerid": 307, "fullname": "Jay Adams"}


In [10]:
# Read from the customers table in the glue data catalog using a dynamic frame
dynamicFrameOrders = glueContext.create_dynamic_frame.from_catalog(
database = "pyspark_tutorial_db", 
table_name = "orders"
)

# Show top 10 rows of orders table
dynamicFrameOrders.show(10)

{"salesorderid": 43659, "salesorderdetailid": 1, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 776, "orderqty": 1, "unitprice": 2024.9940, "unitpricediscount": 0.0000, "linetotal": 2024.9940}
{"salesorderid": 43659, "salesorderdetailid": 2, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 777, "orderqty": 3, "unitprice": 2024.9940, "unitpricediscount": 0.0000, "linetotal": 6074.9820}
{"salesorderid": 43659, "salesorderdetailid": 3, "orderdate": "5/31/2011", "duedate": "6/12/2011", "shipdate": "6/7/2011", "employeeid": 279, "customerid": 1045, "subtotal": 20565.6206, "taxamt": 1971.5149, "freight": 616.0984, "totaldue": 23153.2339, "productid": 778, "order

In [11]:
# Inner joining the Dynamic Frames
dyfjoin = dynamicFrameCustomers.join(["customerid"], ["customerid"], dynamicFrameOrders)
dyfjoin.show(10)

{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1628, "productid": 754, "linetotal": 874.7940, "employeeid": 277, ".customerid": 671, "taxamt": 579.2061, "salesorderid": 44097, "duedate": "8/13/2011", "orderqty": 1, "shipdate": "8/8/2011", "lastname": "Chapla", "firstname": "Lee", "totaldue": 6796.0326, "unitprice": 874.7940, "orderdate": "8/1/2011", "unitpricediscount": 0.0000, "customerid": 671, "fullname": "Lee Chapla"}
{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1629, "productid": 760, "linetotal": 419.4589, "employeeid": 277, ".customerid": 671, "taxamt": 579.2061, "salesorderid": 44097, "duedate": "8/13/2011", "orderqty": 1, "shipdate": "8/8/2011", "lastname": "Chapla", "firstname": "Lee", "totaldue": 6796.0326, "unitprice": 419.4589, "orderdate": "8/1/2011", "unitpricediscount": 0.0000, "customerid": 671, "fullname": "Lee Chapla"}
{"freight": 181.0019, "subtotal": 6035.8246, "salesorderdetailid": 1630, "productid": 762, "linetotal": 838.

In [12]:
# How do write back to S3?
# We can write down to S3 using the glue libraries, but we have to create a folder in S3

glueContext.write_dynamic_frame.from_options(
frame = dynamicFrameCustomers,
connection_type = "s3",
connection_options={"path":"s3://luis-delotavo-pyspark-tutorial/write_down_dyf_to_s3/"},
format="csv",
format_options={"separator":","},
transformation_ctx="datasink2"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f513b190090>


In [13]:
# Write down to data catalog using Dynamic Frame
glueContext.write_dynamic_frame.from_catalog(
frame=dynamicFrameCustomers,
database="pyspark_tutorial_db",
table_name="customers_write_dyf"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f513b190fd0>


In [14]:
# Dynamic Frame (is for Glue)
# Dynamic Frame -> Spark Data Frame, Dynamic Frame is less extensive

sparkDF = dynamicFrameCustomers.toDF()
sparkDF.show()

+----------+----------+-----------+--------------------+
|customerid| firstname|   lastname|            fullname|
+----------+----------+-----------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|
|       313|     James|    Aguilar|       James Aguilar|
|       315|    Robert|   Ahlering|     Robert Ahlering|
|       319|       Kim|      Akers|           Kim Akers|
|       441|   Stanley|       Alan|        Stanley Alan|
|       323|       Amy|    Albe

In [16]:
# Selecting Columns
dfSelect = sparkDF.select("customerid", "fullname")

# Show
dfSelect.show()

+----------+--------------------+
|customerid|            fullname|
+----------+--------------------+
|       293|      Catherine Abel|
|       295|     Kim Abercrombie|
|       297|    Humberto Acevedo|
|       291|      Gustavo Achong|
|       299|      Pilar Ackerman|
|       305|         Carla Adams|
|       301|       Frances Adams|
|       307|           Jay Adams|
|       309|        Ronald Adina|
|       311|     Samuel Agcaoili|
|       313|       James Aguilar|
|       315|     Robert Ahlering|
|       319|           Kim Akers|
|       441|        Stanley Alan|
|       323|         Amy Alberts|
|       325|       Anna Albright|
|       327|       Milton Albury|
|       329|         Paul Alcorn|
|       331|    Gregory Alderson|
|       333|J. Phillip Alexander|
+----------+--------------------+
only showing top 20 rows


In [17]:
# Let's add columns together
# Sometimes you can do stuff in Dynamic Frame that you cant do in Spark and other way around

# Import the lit library
from pyspark.sql.functions import lit

# Add new column with literal value
dfNewColumn = sparkDF.withColumn("date", lit("2022-07-25"))
# Show column
dfNewColumn.show()




+----------+----------+-----------+--------------------+----------+
|customerid| firstname|   lastname|            fullname|      date|
+----------+----------+-----------+--------------------+----------+
|       293| Catherine|       Abel|      Catherine Abel|2022-07-25|
|       295|       Kim|Abercrombie|     Kim Abercrombie|2022-07-25|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|2022-07-25|
|       291|   Gustavo|     Achong|      Gustavo Achong|2022-07-25|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|2022-07-25|
|       305|     Carla|      Adams|         Carla Adams|2022-07-25|
|       301|   Frances|      Adams|       Frances Adams|2022-07-25|
|       307|       Jay|      Adams|           Jay Adams|2022-07-25|
|       309|    Ronald|      Adina|        Ronald Adina|2022-07-25|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|2022-07-25|
|       313|     James|    Aguilar|       James Aguilar|2022-07-25|
|       315|    Robert|   Ahlering|     Robert A

In [19]:
# Create a new column and add two strings together

from pyspark.sql.functions import concat
dfnewFullname = sparkDF.withColumn("new_full_name", concat("firstname", concat(lit(' '),"lastname")))

dfnewFullname.show()

+----------+----------+-----------+--------------------+--------------------+
|customerid| firstname|   lastname|            fullname|       new_full_name|
+----------+----------+-----------+--------------------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|     Sam

In [20]:
# Drop columns
dfDropCol = sparkDF.drop("firstname","lastname")
dfDropCol.show()

+----------+--------------------+
|customerid|            fullname|
+----------+--------------------+
|       293|      Catherine Abel|
|       295|     Kim Abercrombie|
|       297|    Humberto Acevedo|
|       291|      Gustavo Achong|
|       299|      Pilar Ackerman|
|       305|         Carla Adams|
|       301|       Frances Adams|
|       307|           Jay Adams|
|       309|        Ronald Adina|
|       311|     Samuel Agcaoili|
|       313|       James Aguilar|
|       315|     Robert Ahlering|
|       319|           Kim Akers|
|       441|        Stanley Alan|
|       323|         Amy Alberts|
|       325|       Anna Albright|
|       327|       Milton Albury|
|       329|         Paul Alcorn|
|       331|    Gregory Alderson|
|       333|J. Phillip Alexander|
+----------+--------------------+
only showing top 20 rows


In [21]:
# Renaming columns
dfRenameCols = sparkDF.withColumnRenamed("fullname", "full_name")
dfRenameCols.show()

+----------+----------+-----------+--------------------+
|customerid| firstname|   lastname|           full_name|
+----------+----------+-----------+--------------------+
|       293| Catherine|       Abel|      Catherine Abel|
|       295|       Kim|Abercrombie|     Kim Abercrombie|
|       297|  Humberto|    Acevedo|    Humberto Acevedo|
|       291|   Gustavo|     Achong|      Gustavo Achong|
|       299|     Pilar|   Ackerman|      Pilar Ackerman|
|       305|     Carla|      Adams|         Carla Adams|
|       301|   Frances|      Adams|       Frances Adams|
|       307|       Jay|      Adams|           Jay Adams|
|       309|    Ronald|      Adina|        Ronald Adina|
|       311|    Samuel|   Agcaoili|     Samuel Agcaoili|
|       313|     James|    Aguilar|       James Aguilar|
|       315|    Robert|   Ahlering|     Robert Ahlering|
|       319|       Kim|      Akers|           Kim Akers|
|       441|   Stanley|       Alan|        Stanley Alan|
|       323|       Amy|    Albe

In [22]:
# Grouping and Aggregrating is easier in PySpark
# Group on last name and count the number of times they appear

sparkDF.groupBy("lastname").count().show()

+--------+-----+
|lastname|count|
+--------+-----+
|  Achong|    1|
|  Bailey|    1|
|   Caron|    1|
|   Casts|    1|
|   Curry|    1|
| Desalvo|    1|
| Dockter|    1|
|    Dyck|    1|
|  Farino|    1|
| Fluegel|    1|
|   Ganio|    1|
|   Gimmi|    1|
|Gonzales|    2|
|   Graff|    1|
|   Groth|    1|
|    Hass|    1|
| Hassall|    1|
|Hillmann|    1|
| Hodgson|    1|
|   Ihrig|    1|
+--------+-----+
only showing top 20 rows


In [23]:
# Filtering on lastname Adams using Spark Data Frame
sparkDF.filter(sparkDF["lastname"]=="Adams").show()

+----------+---------+--------+-------------+
|customerid|firstname|lastname|     fullname|
+----------+---------+--------+-------------+
|       305|    Carla|   Adams|  Carla Adams|
|       301|  Frances|   Adams|Frances Adams|
|       307|      Jay|   Adams|    Jay Adams|
+----------+---------+--------+-------------+


In [24]:
# Where Clause
sparkDF.where("lastname = 'Adams'").show()

+----------+---------+--------+-------------+
|customerid|firstname|lastname|     fullname|
+----------+---------+--------+-------------+
|       305|    Carla|   Adams|  Carla Adams|
|       301|  Frances|   Adams|Frances Adams|
|       307|      Jay|   Adams|    Jay Adams|
+----------+---------+--------+-------------+


In [25]:
# We can do Inner Join, Left Join, Right Join, Cross Join
# In Spark you have all the different types of Joins where you dont really have that for Glue
# Glue (Dynamic Frame)

# Read from the customers table in the glue data catalog using a dynamic frame and convert to spark dataframe
dfOrders = glueContext.create_dynamic_frame.from_catalog(
                                        database = "pyspark_tutorial_db", 
                                        table_name = "orders"
                                    ).toDF()




In [26]:
# Inner join orders to customers
# SELECT * FROM sparkDF a INNER JOIN dfOrders b ON a.customerid = b.customerid
sparkDF.join(dfOrders, sparkDF.customerid == dfOrders.customerid, "inner").show()

+----------+---------+--------+--------------+------------+------------------+---------+---------+--------+----------+----------+----------+---------+--------+----------+---------+--------+---------+-----------------+---------+
|customerid|firstname|lastname|      fullname|salesorderid|salesorderdetailid|orderdate|  duedate|shipdate|employeeid|customerid|  subtotal|   taxamt| freight|  totaldue|productid|orderqty|unitprice|unitpricediscount|linetotal|
+----------+---------+--------+--------------+------------+------------------+---------+---------+--------+----------+----------+----------+---------+--------+----------+---------+--------+---------+-----------------+---------+
|       517|  Richard|  Bready|Richard Bready|       43665|                61|5/31/2011|6/12/2011|6/7/2011|       283|       517|14352.7713|1375.9427|429.9821|16158.6961|      711|       2|  20.1865|           0.0000|  40.3730|
|       517|  Richard|  Bready|Richard Bready|       43665|                62|5/31/2011|

In [27]:
# Get customers that only have surname Adams
dfAdams = sparkDF.where("lastname =='Adams'")

# Inner join on Adams DF and orders
dfAdams.join(dfOrders,dfAdams.customerid ==  dfOrders.customerid,"inner").show()

# Show where there is both Adams and Orders

+----------+---------+--------+---------+------------+------------------+----------+----------+---------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+---------+
|customerid|firstname|lastname| fullname|salesorderid|salesorderdetailid| orderdate|   duedate| shipdate|employeeid|customerid|  subtotal|   taxamt|  freight|  totaldue|productid|orderqty|unitprice|unitpricediscount|linetotal|
+----------+---------+--------+---------+------------+------------------+----------+----------+---------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+---------+
|       307|      Jay|   Adams|Jay Adams|       48382|             23857|10/30/2012|11/11/2012|11/6/2012|       277|       307|   20.5200|   2.0246|   0.6327|   23.1773|      805|       1|  20.5200|           0.0000|  20.5200|
|       307|      Jay|   Adams|Jay Adams|       50734|             34874| 4/30/2013| 5/12/20

In [28]:
# Left join on orders and adams df
dfOrders.join(dfAdams,dfAdams.customerid ==  dfOrders.customerid,"left").show(100)

+------------+------------------+---------+---------+--------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+---------+----------+---------+--------+--------+
|salesorderid|salesorderdetailid|orderdate|  duedate|shipdate|employeeid|customerid|  subtotal|   taxamt|  freight|  totaldue|productid|orderqty|unitprice|unitpricediscount|linetotal|customerid|firstname|lastname|fullname|
+------------+------------------+---------+---------+--------+----------+----------+----------+---------+---------+----------+---------+--------+---------+-----------------+---------+----------+---------+--------+--------+
|       43665|                61|5/31/2011|6/12/2011|6/7/2011|       283|       517|14352.7713|1375.9427| 429.9821|16158.6961|      711|       2|  20.1865|           0.0000|  40.3730|      NULL|     NULL|    NULL|    NULL|
|       43665|                62|5/31/2011|6/12/2011|6/7/2011|       283|       517|14352.7713|1375.9427| 42

In [30]:
# Easiest way is actually turn it into Dynamic Frame
# Then do the same syntax
# Apply transformations on Spark Frame
# Use Dynamic Frame to write back since its better

# Import the Dynamic Frame Class
from awsglue.dynamicframe import DynamicFrame

# Convert Spark into Dynamic Frame and display
dyfCustomersConvert = DynamicFrame.fromDF(sparkDF, glueContext, "convert")
dyfCustomersConvert.show()

{"customerid": 293, "firstname": "Catherine", "lastname": "Abel", "fullname": "Catherine Abel"}
{"customerid": 295, "firstname": "Kim", "lastname": "Abercrombie", "fullname": "Kim Abercrombie"}
{"customerid": 297, "firstname": "Humberto", "lastname": "Acevedo", "fullname": "Humberto Acevedo"}
{"customerid": 291, "firstname": "Gustavo", "lastname": "Achong", "fullname": "Gustavo Achong"}
{"customerid": 299, "firstname": "Pilar", "lastname": "Ackerman", "fullname": "Pilar Ackerman"}
{"customerid": 305, "firstname": "Carla", "lastname": "Adams", "fullname": "Carla Adams"}
{"customerid": 301, "firstname": "Frances", "lastname": "Adams", "fullname": "Frances Adams"}
{"customerid": 307, "firstname": "Jay", "lastname": "Adams", "fullname": "Jay Adams"}
{"customerid": 309, "firstname": "Ronald", "lastname": "Adina", "fullname": "Ronald Adina"}
{"customerid": 311, "firstname": "Samuel", "lastname": "Agcaoili", "fullname": "Samuel Agcaoili"}
{"customerid": 313, "firstname": "James", "lastname": 

In [32]:
# Write DynamicFrame into S3 Bucket
glueContext.write_dynamic_frame.from_options(
frame=dyfCustomersConvert,
connection_type="s3",
connection_options={"path":"s3://luis-delotavo-pyspark-tutorial/write_down_dyf_to_s3/"},
format="csv",
format_options={
"separator":","},
transformation_ctx="datasink3"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f513b192b50>


In [33]:
# Write DynamicFrame into S3 Bucket with Glue Data Catalog
glueContext.write_dynamic_frame.from_catalog(
frame = dyfCustomersConvert,
database="pyspark_tutorial_db",
table_name="customers_write_dyf"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f513b192a10>
