# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# 1. Extracting Data

## 1.1 Extracting data from APIs (e.g., REST API)

In [None]:
import requests
import json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType

# Fetch the JSON response using requests library
response = requests.get("https://dummyjson.com/products/1")
data = response.json()
print(data)
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("title", StringType(), nullable=True),
    StructField("description", StringType(), nullable=True),
    StructField("price", IntegerType(), nullable=True),
    StructField("discountPercentage", FloatType(), nullable=True),
    StructField("rating", FloatType(), nullable=True),
    StructField("stock", IntegerType(), nullable=True),
    StructField("brand", StringType(), nullable=True),
    StructField("category", StringType(), nullable=True),
    StructField("thumbnail", StringType(), nullable=True),
    StructField("images", ArrayType(StringType()), nullable=True)
])
dataframe = spark.createDataFrame([data], schema=schema)

# Display the DataFrame
dataframe.show()


## 1.2 Extracting data from files (e.g., CSV)

In [None]:
df_orders = spark.read.format("csv").option("header", "true").load("s3://ijdhad-mydemo/landing_zone/orders/19980505")
df_categories = spark.read.format("csv").option("header", "true").load("s3://ijdhad-mydemo/landing_zone/categories")
df_customers = spark.read.format("csv").option("header", "true").load("s3://ijdhad-mydemo/landing_zone/customers")
df_orders_details = spark.read.format("csv").option("header", "true").load("s3://ijdhad-mydemo/landing_zone/orders_details")
df_products = spark.read.format("csv").option("header", "true").load("s3://ijdhad-mydemo/landing_zone/products")
df_suppliers = spark.read.format("csv").option("header", "true").load("s3://ijdhad-mydemo/landing_zone/suppliers")

# Display the DataFrame
df_products.show()

## 1.3 Extracting data from a database using JDBC connection

In [None]:
jdbc_url = "jdbc:postgresql://<host>:<port>/<database>"
db_properties = {"user": "<username>", "password": "<password>", "driver": "org.postgresql.Driver"}
dataframe = spark.read.format("jdbc").options(url=jdbc_url, **db_properties).load()


# 2. Loading Data

## 2.1 Loading Data to S3 Raw Zone

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, format_string

df_orders = df_orders.withColumn("year", year(df_orders['orderdate']))
df_orders = df_orders.withColumn("month", format_string("%02d", month(df_orders['orderdate'])))
df_orders = df_orders.withColumn("day", format_string("%02d",dayofmonth(df_orders['orderdate'])))



df_orders.write.partitionBy("year", "month", "day").parquet("s3://ijdhad-mydemo/raw_zone_pyspark/orders/partitioned_data/")
df_categories.write.parquet("s3://ijdhad-mydemo/raw_zone_pyspark/categories/")
df_customers.write.parquet("s3://ijdhad-mydemo/raw_zone_pyspark/customers/")
df_orders_details.write.parquet("s3://ijdhad-mydemo/raw_zone_pyspark/orders_details/")
df_products.write.parquet("s3://ijdhad-mydemo/raw_zone_pyspark/products/")
df_suppliers.write.parquet("s3://ijdhad-mydemo/raw_zone_pyspark/suppliers/")

## 2.2 Create Athena Table to read data from Raw Zone

In [145]:
def generate_ddl(dataframe, table_name, db_name, zone):
    ddl_statement = f"CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}.{table_name} (\n"
    
    for field in dataframe.schema.fields:
        column_name = field.name
        data_type = field.dataType.simpleString()
        ddl_statement += f"  {column_name} {data_type},\n"
    
    ddl_statement = ddl_statement.rstrip(",\n")
    ddl_statement += f"\n)\nSTORED AS PARQUET\nLOCATION 's3://ijdhad-mydemo/{zone}_pyspark/{table_name}/'"
    

    # Print the DDL statement
    print(ddl_statement)
                  
    # Create Athena Table
    spark.sql(ddl_statement)




In [None]:
db_name = "demo"
generate_ddl(df_categories, "categories", db_name, "raw_zone")
generate_ddl(df_customers, "customers", db_name, "raw_zone")
generate_ddl(df_orders_details,"orders_details", db_name, "raw_zone")
generate_ddl(df_products, "products", db_name, "raw_zone")
generate_ddl(df_suppliers, "suppliers", db_name, "raw_zone")

In [141]:
def generate_ddl_with_partition(dataframe, table_name, db_name, zone):
    ddl_statement = f"CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}.{table_name} (\n"
    
    for field in dataframe.schema.fields:
        column_name = field.name
        if column_name not in ["year", "month", "day"]:
            data_type = field.dataType.simpleString()
            ddl_statement += f"  {column_name} {data_type},\n"
    
    ddl_statement = ddl_statement.rstrip(",\n")
    ddl_statement += f"""\n)
                        PARTITIONED BY ( 
                          `year` int, 
                          `month` int, 
                          `day` int)
                        """
    ddl_statement += f"\nSTORED AS PARQUET\nLOCATION 's3://ijdhad-mydemo/{zone}_pyspark/{table_name}/partitioned_data/'"
    

    # Print the DDL statement
    print(ddl_statement)
                  
    # Create Athena Table
    spark.sql(ddl_statement)
    spark.sql(f"MSCK REPAIR TABLE {db_name}.{table_name}")




In [None]:
generate_ddl_with_partition(df_orders, "orders", db_name, "raw_zone")

# 3. Transforming data

In [125]:
# Joining data
df_products = df_products.alias("products")
df_categories = df_categories.alias("categories")
df_suppliers = df_suppliers.alias("suppliers")

df_dim_products = df_products \
                    .join(df_categories, df_products["categoryid"] == df_categories["categoryid"], how="right").drop(*["categoryid"]) \
                    .join(df_suppliers,df_products["supplierid"] == df_suppliers["supplierid"], how="right").drop(*["supplierid"])
df_dim_products.show()

+---------+--------------------+-------------------+---------+------------+------------+------------+------------+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----------+--------+----------+---------+--------------+--------------+--------------------+
|productid|         productname|    quantityperunit|unitprice|unitsinstock|unitsonorder|reorderlevel|discontinued|  categoryname|         description|picture|         companyname|         contactname|        contacttitle|             address|       city|  region|postalcode|  country|         phone|           fax|            homepage|
+---------+--------------------+-------------------+---------+------------+------------+------------+------------+--------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----------+--------+----------+---------+--------------+--------------+--------------

In [128]:

df_orders_details = df_orders_details.withColumnRenamed("orderid", "orderid_2")
df_fact_orders_items = df_orders_details  \
                    .join(df_orders, df_orders["orderid"] == df_orders_details["orderid_2"], how="right").drop(*["orderid_2"])
df_fact_orders_items.show()

+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+---------+---------+--------+--------+
|orderid|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|year|month|day|productid|unitprice|quantity|discount|
+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+---------+---------+--------+--------+
|  10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      null|         51100|     France|1996|   07| 04|       11|       14|      12|       0|
|  10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|     

## 3.1 Trasnform Fact_Orders_Items

In [129]:
from pyspark.sql.functions import expr, concat, substring, col
# Add 543 years to the orderdate column while maintaining month-day format
df_fact_orders_items = df_fact_orders_items.withColumn("new_year", expr("substring(orderdate, 1, 4) + 543"))
df_fact_orders_items = df_fact_orders_items.withColumn("orderdate", expr("concat(cast(substring(orderdate, 1, 4) + 543 as string), substring(orderdate, 5))"))
df_fact_orders_items = df_fact_orders_items.withColumn("orderdate", expr("replace(cast(orderdate as string), '.0', '')"))
df_fact_orders_items = df_fact_orders_items.withColumn("orderdate", to_date("orderdate", "yyyy-MM-dd"))

df_fact_orders_items.show()

+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+---------+---------+--------+--------+--------+
|orderid|customerid|employeeid| orderdate|requireddate|shippeddate|shipvia|freight|            shipname|         shipaddress|      shipcity|shipregion|shippostalcode|shipcountry|year|month|day|productid|unitprice|quantity|discount|new_year|
+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+----+-----+---+---------+---------+--------+--------+--------+
|  10248|     VINET|         5|2539-07-04|  1996-08-01| 1996-07-16|      3|  32.38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      null|         51100|     France|1996|   07| 04|       11|       14|      12|       0|  2539.0|
|  10248|     VINET|         5|2539-

## 3.2 Load Transformed Data To Serving Zone

In [133]:
df_fact_orders_items.drop(*["year","month","day"])
df_fact_orders_items = df_fact_orders_items.withColumn("year", year(df_fact_orders_items['orderdate']))
df_fact_orders_items = df_fact_orders_items.withColumn("month", format_string("%02d", month(df_fact_orders_items['orderdate'])))
df_fact_orders_items = df_fact_orders_items.withColumn("day", format_string("%02d",dayofmonth(df_fact_orders_items['orderdate'])))

df_fact_orders_items.write.partitionBy("year", "month", "day").parquet("s3://ijdhad-mydemo/serving_zone_pyspark/fact_orders_items/partitioned_data/")

AnalysisException: path s3://ijdhad-mydemo/serving_zone_pyspark/fact_orders_items/partitioned_data already exists.


In [134]:
df_dim_products.write.parquet("s3://ijdhad-mydemo/serving_zone_pyspark/dim_products/")




In [146]:
generate_ddl(df_dim_products, "dim_products", db_name, "serving_zone")

CREATE EXTERNAL TABLE IF NOT EXISTS demo.dim_products (
  productid string,
  productname string,
  quantityperunit string,
  unitprice string,
  unitsinstock string,
  unitsonorder string,
  reorderlevel string,
  discontinued string,
  categoryname string,
  description string,
  picture string,
  companyname string,
  contactname string,
  contacttitle string,
  address string,
  city string,
  region string,
  postalcode string,
  country string,
  phone string,
  fax string,
  homepage string
)
STORED AS PARQUET
LOCATION 's3://ijdhad-mydemo/serving_zone_pyspark/dim_products/'


In [143]:
generate_ddl_with_partition(df_fact_orders_items, "fact_orders_items", db_name, "serving_zone")

CREATE EXTERNAL TABLE IF NOT EXISTS demo.fact_orders_items (
  orderid string,
  customerid string,
  employeeid string,
  orderdate date,
  requireddate string,
  shippeddate string,
  shipvia string,
  freight string,
  shipname string,
  shipaddress string,
  shipcity string,
  shipregion string,
  shippostalcode string,
  shipcountry string,
  productid string,
  unitprice string,
  quantity string,
  discount string,
  new_year double
)
                        PARTITIONED BY ( 
                          `year` int, 
                          `month` int, 
                          `day` int)
                        
STORED AS PARQUET
LOCATION 's3://ijdhad-mydemo/serving_zone_pyspark/fact_orders_items/partitioned_data/'
