In [87]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import col, lpad, to_timestamp, when, datediff, to_date, count, sum, expr, first, mean, avg

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 0b3af2dd-84cf-4717-894b-df2f37519480.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 0b3af2dd-84cf-4717-894b-df2f37519480.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 0b3af2dd-84cf-4717-894b-df2f37519480.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 0b3af2dd-84cf-4717-894b-df2f37519480.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 2
Setting new number of workers to: 2



In [88]:
silver_orders = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='orders').toDF()
silver_order_items = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='orders_items').toDF()
silver_order_reviews = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='orders_reviews').toDF()
silver_order_payments = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='orders_payments').toDF()
silver_product = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='product').toDF()
silver_customers = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='customers').toDF()
silver_seller = glueContext.create_dynamic_frame.from_catalog(database='ecommerce-datalake', table_name='sellers')



In [89]:
# 1. Update order_status based on conditions
silver_orders = silver_orders.withColumn(
    'order_status',
    when(col('order_status').isin(['invoiced', 'shipped', 'processing', 'created', 'approved']), 'in-progress')
    .when(col('order_delivered_customer_date').isNotNull(), 'delivered')
    .otherwise(col('order_status'))
)

# 2. Drop columns
silver_orders = silver_orders.drop('order_approved_at', 'order_delivered_carrier_date')

# 3. Calculate estimated_days_to_delivery and actual_delivery_in_days
silver_orders = silver_orders.withColumn(
    'estimated_days_to_delivery',
    datediff(col('order_estimated_delivery_date'), col('order_purchase_timestamp'))
)
silver_orders = silver_orders.withColumn(
    'actual_delivery_in_days',
    datediff(col('order_delivered_customer_date'), col('order_purchase_timestamp'))
)

# 4. Convert timestamps to date
silver_orders = silver_orders.withColumn(
    'order_purchase_timestamp', to_date(col('order_purchase_timestamp'))
)
silver_orders = silver_orders.withColumn(
    'order_delivered_customer_date', to_date(col('order_delivered_customer_date'))
)
silver_orders = silver_orders.withColumn(
    'order_estimated_delivery_date', to_date(col('order_estimated_delivery_date'))
)




In [90]:
#Cleanup order_items tables
silver_order_items = silver_order_items.dropDuplicates()

# Drop the 'shipping_limit_date' column
silver_order_items = silver_order_items.drop('shipping_limit_date')





In [91]:
#####cleanup order_payments
# Group by 'order_id' and aggregate
grouped = silver_order_payments.groupBy('order_id')

# Count the number of occurrences of 'order_id'
count_values = grouped.agg(count('order_id').alias('count_values'))

# Calculate the mode of 'payment_type'
mode_status = grouped.agg(expr('first(payment_type)').alias('payment_type'))

# Sum the 'payment_value'
sum_amount = grouped.agg(sum('payment_value').alias('total_amount'))

# Combine the results
silver_order_payments_aggregate = count_values.join(mode_status, on='order_id').join(sum_amount, on='order_id')





In [92]:
####create fact table
# 1. Merge DataFrames
orders_fact = silver_orders.join(silver_order_items, on='order_id', how='left')
orders_fact = orders_fact.join(silver_order_payments_aggregate, on='order_id', how='left')

# 2. Update column values based on conditions
orders_fact = orders_fact.withColumn(
    'order_status',
    when((col('order_status') == 'in-progress') & col('product_id').isNull(), 'unavailable')
    .otherwise(col('order_status'))
)

# 3. Drop columns
orders_fact = orders_fact.drop('order_item_id', 'count_values')

# 4. Rename columns
orders_fact = orders_fact.withColumnRenamed('seller_zip_code_prefix', 'seller_zip_code') \
    .withColumnRenamed('order_purchase_timestamp', 'order_purchase_date') \
    .withColumnRenamed('order_delivered_customer_date', 'order_delivered_date') \
    .withColumnRenamed('order_estimated_delivery_date', 'order_estimated_delivery_date') \
    .withColumnRenamed('total_amount', 'total_amount_per_order_id')

spark_orders_fact = DynamicFrame.fromDF(orders_fact, glueContext) 




In [93]:
datasink1 = glueContext.write_dynamic_frame.from_options(frame = spark_orders_fact, connection_type = "s3", connection_options = {"path": "s3://ecommerce-gold/fact_order/"}, format = "parquet")





In [94]:
# Convert 'review_creation_date' to date format
silver_order_reviews = silver_order_reviews.withColumn(
    'review_creation_date',
    to_date(col('review_creation_date').cast('date'))
)
spark_silver_order_reviews = DynamicFrame.fromDF(silver_order_reviews, glueContext) 




In [95]:
datasink2 = glueContext.write_dynamic_frame.from_options(frame = spark_silver_order_reviews, connection_type = "s3", connection_options = {"path": "s3://ecommerce-gold/dim_order_review/"}, format = "parquet")





In [96]:
# Drop column 'customer_unique_id'
silver_customers = silver_customers.drop('customer_unique_id')

# Drop rows with null values in specified columns
required_columns = ['customer_id', 'customer_zip_code', 'customer_city', 'customer_state']
silver_customers = silver_customers.na.drop(subset=required_columns)
spark_silver_customers = DynamicFrame.fromDF(silver_customers, glueContext) 




In [97]:
datasink3 = glueContext.write_dynamic_frame.from_options(frame = spark_silver_customers, connection_type = "s3", connection_options = {"path": "s3://ecommerce-gold/dim_customers/"}, format = "parquet")





In [98]:
silver_product_df = silver_product.toPandas()
silver_product_df = silver_product_df.drop(['product_category_name','product_name_lenght','product_description_lenght','product_photos_qty'],axis=1)
silver_product_df["product_weight_g"] = silver_product_df['product_weight_g'].fillna(silver_product_df.groupby('product_category_name_english')['product_weight_g'].transform('mean'))
silver_product_df["product_length_cm"] = silver_product_df['product_length_cm'].fillna(silver_product_df.groupby('product_category_name_english')['product_length_cm'].transform('mean'))
silver_product_df["product_height_cm"] = silver_product_df['product_height_cm'].fillna(silver_product_df.groupby('product_category_name_english')['product_height_cm'].transform('mean'))
silver_product_df["product_width_cm"] = silver_product_df['product_width_cm'].fillna(silver_product_df.groupby('product_category_name_english')['product_width_cm'].transform('mean'))
silver_product_df = silver_product_df.rename({"product_category_name_english":"product_category"},axis=1)
silver_product = spark.createDataFrame(silver_product_df)
silver_product = DynamicFrame.fromDF(silver_product, glueContext) 

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [99]:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = silver_product, connection_type = "s3", connection_options = {"path": "s3://ecommerce-gold/dim_product/"}, format = "parquet")





In [100]:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = silver_seller, connection_type = "s3", connection_options = {"path": "s3://ecommerce-gold/dim_seller/"}, format = "parquet")



