# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 8fe93770-898a-450b-87d0-592154a8cec3
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 8fe93770-898a-450b-87d0-592154a8cec3 to get into ready status...
Session 8fe93770-898a-450b-87d0-592154a8cec3 has been created.



# 1. Extracting Data

## 1.1 Extracting data from APIs (e.g., REST API)

In [3]:
import requests
import json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType

# Fetch the JSON response using requests library
response = requests.get("https://dummyjson.com/products/1")
data = response.json()
print(data)
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("title", StringType(), nullable=True),
    StructField("description", StringType(), nullable=True),
    StructField("price", FloatType(), nullable=True),
    StructField("discountPercentage", FloatType(), nullable=True),
    StructField("rating", FloatType(), nullable=True),
    StructField("stock", IntegerType(), nullable=True),
    StructField("brand", StringType(), nullable=True),
    StructField("category", StringType(), nullable=True),
    StructField("thumbnail", StringType(), nullable=True),
    StructField("images", ArrayType(StringType()), nullable=True)
])
dataframe = spark.createDataFrame([data], schema=schema)

# Display the DataFrame
dataframe.show()


{'id': 1, 'title': 'Essence Mascara Lash Princess', 'description': 'The Essence Mascara Lash Princess is a popular mascara known for its volumizing and lengthening effects. Achieve dramatic lashes with this long-lasting and cruelty-free formula.', 'category': 'beauty', 'price': 9.99, 'discountPercentage': 7.17, 'rating': 4.94, 'stock': 5, 'tags': ['beauty', 'mascara'], 'brand': 'Essence', 'sku': 'RCH45Q1A', 'weight': 2, 'dimensions': {'width': 23.17, 'height': 14.43, 'depth': 28.01}, 'warrantyInformation': '1 month warranty', 'shippingInformation': 'Ships in 1 month', 'availabilityStatus': 'Low Stock', 'reviews': [{'rating': 2, 'comment': 'Very unhappy with my purchase!', 'date': '2024-05-23T08:56:21.618Z', 'reviewerName': 'John Doe', 'reviewerEmail': 'john.doe@x.dummyjson.com'}, {'rating': 2, 'comment': 'Not as described!', 'date': '2024-05-23T08:56:21.618Z', 'reviewerName': 'Nolan Gonzalez', 'reviewerEmail': 'nolan.gonzalez@x.dummyjson.com'}, {'rating': 5, 'comment': 'Very satisfied!

## 1.2 Extracting data from files (e.g., CSV)

In [6]:
df_orders = spark.read.format("csv").option("header", "true").load("s3://asdfgh-pyspark/landing-zone/orders/19980505")
df_categories = spark.read.format("csv").option("header", "true").load("s3://asdfgh-pyspark/landing-zone/categories")
df_customers = spark.read.format("csv").option("header", "true").load("s3://asdfgh-pyspark/landing-zone/customers")
df_orders_details = spark.read.format("csv").option("header", "true").load("s3://asdfgh-pyspark/landing-zone/orders_details")
df_products = spark.read.format("csv").option("header", "true").load("s3://asdfgh-pyspark/landing-zone/products")
df_suppliers = spark.read.format("csv").option("header", "true").load("s3://asdfgh-pyspark/landing-zone/suppliers")

# Display the DataFrame
df_orders.show()

+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+
|orderid|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|
+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+
|  10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      null|         51100|     France|
|  10249|     TOMSP|         6|1996-07-05|  1996-08-16| 1996-07-10|      1|  11.61|  Toms Spezialitäten|       Luisenstr. 48|       Münster|      null|         44087|    Germany|
|  10250|     HANAR|         4|1996-07-08|  1996-08-05| 1996-07-12|      2|  65.83|       Hanari Carnes| 

## 1.3 Extracting data from a database using JDBC connection

In [None]:
jdbc_url = "jdbc:postgresql://<host>:<port>/<database>"
db_properties = {"user": "<username>", "password": "<password>", "driver": "org.postgresql.Driver"}
dataframe = spark.read.format("jdbc").options(url=jdbc_url, **db_properties).load()


# 2. Loading Data

## 2.1 Loading Data to S3 Raw Zone

In [8]:
from pyspark.sql.functions import year, month, dayofmonth, format_string

df_orders = df_orders.withColumn("year", year(df_orders['orderdate']))
df_orders = df_orders.withColumn("month", format_string("%02d", month(df_orders['orderdate'])))
df_orders = df_orders.withColumn("day", format_string("%02d",dayofmonth(df_orders['orderdate'])))



df_orders.write.partitionBy("year", "month", "day").parquet("s3://asdfgh-pyspark/raw_zone_pyspark/orders/partitioned_data/")
df_categories.write.parquet("s3://asdfgh-pyspark/raw_zone_pyspark/categories/")
df_customers.write.parquet("s3://asdfgh-pyspark/raw_zone_pyspark/customers/")
df_orders_details.write.parquet("s3://asdfgh-pyspark/raw_zone_pyspark/orders_details/")
df_products.write.parquet("s3://asdfgh-pyspark/raw_zone_pyspark/products/")
df_suppliers.write.parquet("s3://asdfgh-pyspark/raw_zone_pyspark/suppliers/")

AnalysisException: path s3://asdfgh-pyspark/raw_zone_pyspark/orders/partitioned_data already exists.


## 2.2 Create Athena Table to read data from Raw Zone

In [9]:
def generate_ddl(dataframe, table_name, db_name, zone):
    ddl_statement = f"CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}.{table_name} (\n"
    
    for field in dataframe.schema.fields:
        column_name = field.name
        data_type = field.dataType.simpleString()
        ddl_statement += f"  {column_name} {data_type},\n"
    
    ddl_statement = ddl_statement.rstrip(",\n")
    ddl_statement += f"\n)\nSTORED AS PARQUET\nLOCATION 's3://asdfgh-pyspark/{zone}_pyspark/{table_name}/'"
    

    # Print the DDL statement
    print(ddl_statement)
                  
    # Create Athena Table
    spark.sql(ddl_statement)




In [10]:
db_name = "raw_zone"
generate_ddl(df_categories, "categories", db_name, "raw_zone")
generate_ddl(df_customers, "customers", db_name, "raw_zone")
generate_ddl(df_orders_details,"orders_details", db_name, "raw_zone")
generate_ddl(df_products, "products", db_name, "raw_zone")
generate_ddl(df_suppliers, "suppliers", db_name, "raw_zone")

CREATE EXTERNAL TABLE IF NOT EXISTS raw_zone.categories (
  categoryid string,
  categoryname string,
  description string,
  picture string
)
STORED AS PARQUET
LOCATION 's3://asdfgh-pyspark/raw_zone_pyspark/categories/'
CREATE EXTERNAL TABLE IF NOT EXISTS raw_zone.customers (
  customerid string,
  companyname string,
  contactname string,
  contacttitle string,
  address string,
  city string,
  region string,
  postalcode string,
  country string,
  phone string,
  fax string
)
STORED AS PARQUET
LOCATION 's3://asdfgh-pyspark/raw_zone_pyspark/customers/'
CREATE EXTERNAL TABLE IF NOT EXISTS raw_zone.orders_details (
  orderid string,
  productid string,
  unitprice string,
  quantity string,
  discount string
)
STORED AS PARQUET
LOCATION 's3://asdfgh-pyspark/raw_zone_pyspark/orders_details/'
CREATE EXTERNAL TABLE IF NOT EXISTS raw_zone.products (
  productid string,
  productname string,
  supplierid string,
  categoryid string,
  quantityperunit string,
  unitprice string,
  unitsins

In [11]:
def generate_ddl_with_partition(dataframe, table_name, db_name, zone):
    ddl_statement = f"CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}.{table_name} (\n"
    
    for field in dataframe.schema.fields:
        column_name = field.name
        if column_name not in ["year", "month", "day"]:
            data_type = field.dataType.simpleString()
            ddl_statement += f"  {column_name} {data_type},\n"
    
    ddl_statement = ddl_statement.rstrip(",\n")
    ddl_statement += f"""\n)
                        PARTITIONED BY ( 
                          `year` int, 
                          `month` int, 
                          `day` int)
                        """
    ddl_statement += f"\nSTORED AS PARQUET\nLOCATION 's3://asdfgh-pyspark/{zone}_pyspark/{table_name}/partitioned_data/'"
    

    # Print the DDL statement
    print(ddl_statement)
                  
    # Create Athena Table
    spark.sql(ddl_statement)
    spark.sql(f"MSCK REPAIR TABLE {db_name}.{table_name}")




In [12]:
generate_ddl_with_partition(df_orders, "orders", db_name, "raw_zone")

CREATE EXTERNAL TABLE IF NOT EXISTS raw_zone.orders (
  orderid string,
  customerid string,
  employeeid string,
  orderdate string,
  requireddate string,
  shippeddate string,
  shipvia string,
  freight string,
  shipname string,
  shipaddress string,
  shipcity string,
  shipregion string,
  shippostalcode string,
  shipcountry string
)
                        PARTITIONED BY ( 
                          `year` int, 
                          `month` int, 
                          `day` int)
                        
STORED AS PARQUET
LOCATION 's3://asdfgh-pyspark/raw_zone_pyspark/orders/partitioned_data/'


# 3. Transforming data

In [15]:
# Joining data
df_products = df_products.alias("products")
df_categories = df_categories.alias("categories")
df_suppliers = df_suppliers.alias("suppliers")

df_dim_products = df_products \
                    .join(df_categories, df_products["categoryid"] == df_categories["categoryid"], how="right").drop(*["categoryid"]) \
                    .join(df_suppliers,df_products["supplierid"] == df_suppliers["supplierid"], how="right").drop(*["supplierid"])
df_dim_products.show()

+---------+--------------------+-------------------+---------+------------+------------+------------+------------+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----------+--------+----------+---------+--------------+--------------+--------------------+
|productid|         productname|    quantityperunit|unitprice|unitsinstock|unitsonorder|reorderlevel|discontinued|  categoryname|         description|picture|         companyname|         contactname|        contacttitle|             address|       city|  region|postalcode|  country|         phone|           fax|            homepage|
+---------+--------------------+-------------------+---------+------------+------------+------------+------------+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----------+--------+----------+---------+--------------+--------------+--------------

In [18]:

df_orders_details = df_orders_details.withColumnRenamed("orderid", "orderid_2")
df_fact_orders_items = df_orders_details  \
                    .join(df_orders, df_orders["orderid"] == df_orders_details["orderid_2"], how="right").drop(*["orderid_2"])
df_fact_orders_items.show()

+---------+---------+--------+--------+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+
|productid|unitprice|quantity|discount|orderid|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|year|month|day|
+---------+---------+--------+--------+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+
|       72|     34.8|       5|       0|  10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      null|         51100|     France|1996|   07| 04|
|       42|      9.8|      10|       0|  10248|     VINET|         5|199

## 3.1 Trasnform Fact_Orders_Items

In [19]:
from pyspark.sql.functions import expr, concat, substring, col, to_date
# Add 543 years to the orderdate column while maintaining month-day format
df_fact_orders_items = df_fact_orders_items.withColumn("new_year", expr("substring(orderdate, 1, 4) + 543"))
df_fact_orders_items = df_fact_orders_items.withColumn("orderdate", expr("concat(cast(substring(orderdate, 1, 4) + 543 as string), substring(orderdate, 5))"))
df_fact_orders_items = df_fact_orders_items.withColumn("orderdate", expr("replace(cast(orderdate as string), '.0', '')"))
df_fact_orders_items = df_fact_orders_items.withColumn("orderdate", to_date("orderdate", "yyyy-MM-dd"))

df_fact_orders_items.show()

+---------+---------+--------+--------+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+--------+
|productid|unitprice|quantity|discount|orderid|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|year|month|day|new_year|
+---------+---------+--------+--------+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+--------+
|       72|     34.8|       5|       0|  10248|     VINET|         5|2539-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      null|         51100|     France|1996|   07| 04|  2539.0|
|       42|      9.8|      10|      

## 3.2 Load Transformed Data To Serving Zone

In [21]:
df_fact_orders_items.drop(*["year","month","day"])
df_fact_orders_items = df_fact_orders_items.withColumn("year", year(df_fact_orders_items['orderdate']))
df_fact_orders_items = df_fact_orders_items.withColumn("month", format_string("%02d", month(df_fact_orders_items['orderdate'])))
df_fact_orders_items = df_fact_orders_items.withColumn("day", format_string("%02d",dayofmonth(df_fact_orders_items['orderdate'])))

df_fact_orders_items.write.partitionBy("year", "month", "day").parquet("s3://asdfgh-pyspark/serving_zone_pyspark/fact_orders_items/partitioned_data/")

AnalysisException: path s3://asdfgh-pyspark/serving_zone_pyspark/fact_orders_items/partitioned_data already exists.


In [22]:
df_dim_products.write.parquet("s3://asdfgh-pyspark/serving_zone_pyspark/dim_products/")




In [27]:
db_name = "saving_zone"
generate_ddl(df_dim_products, "dim_products", db_name, "serving_zone")

CREATE EXTERNAL TABLE IF NOT EXISTS saving_zone.dim_products (
  productid string,
  productname string,
  quantityperunit string,
  unitprice string,
  unitsinstock string,
  unitsonorder string,
  reorderlevel string,
  discontinued string,
  categoryname string,
  description string,
  picture string,
  companyname string,
  contactname string,
  contacttitle string,
  address string,
  city string,
  region string,
  postalcode string,
  country string,
  phone string,
  fax string,
  homepage string
)
STORED AS PARQUET
LOCATION 's3://asdfgh-pyspark/serving_zone_pyspark/dim_products/'


In [28]:
generate_ddl_with_partition(df_fact_orders_items, "fact_orders_items", db_name, "serving_zone")

CREATE EXTERNAL TABLE IF NOT EXISTS saving_zone.fact_orders_items (
  productid string,
  unitprice string,
  quantity string,
  discount string,
  orderid string,
  customerid string,
  employeeid string,
  orderdate date,
  requireddate string,
  shippeddate string,
  shipvia string,
  freight string,
  shipname string,
  shipaddress string,
  shipcity string,
  shipregion string,
  shippostalcode string,
  shipcountry string,
  new_year double
)
                        PARTITIONED BY ( 
                          `year` int, 
                          `month` int, 
                          `day` int)
                        
STORED AS PARQUET
LOCATION 's3://asdfgh-pyspark/serving_zone_pyspark/fact_orders_items/partitioned_data/'
