In [0]:
%run "./Ecommerce Dataset Schema and Dataframes"

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
orders_df = spark.read.parquet("/Volumes/olist/bronze/Orders")

In [0]:
order_items_df = spark.read.parquet("/Volumes/olist/bronze/Order_Items")

In [0]:
orders_df = orders_df.repartition(16, "order_id")#.cache()
order_items_df = order_items_df.repartition(16, "item_order_id")#.cache()

In [0]:
#orders_df.count()
#order_items_df.count()

In [0]:
products_df = spark.read.parquet('/Volumes/olist/bronze/Products')

In [0]:
order_payments_df = spark.read.parquet('/Volumes/olist/bronze/Order_Payments')

In [0]:
order_payments_df = order_payments_df.groupBy("payment_order_id") \
.agg(
    max("payment_sequential").alias("max_payment_sequence"),
    collect_set("payment_type").alias("payment_type"),
    sum("payment_installments").alias("payment_installments"),
    count("*").alias("payment_split_count"),  # number of rows per order
    sum("payment_value").alias("payment_value"),
    sum("payment_value_imputed").alias("payment_value_imputed"),
)

In [0]:
#order_payments_df.cache()
#order_payments_df.count()

In [0]:
order_reviews_df = spark.read.parquet('/Volumes/olist/bronze/Order_Reviews')

In [0]:
order_reviews_df = order_reviews_df.groupBy("review_order_id") \
.agg(
    collect_list("review_score").alias("review_scores"),
    collect_list("review_comment_title").alias("review_titles"),
    collect_list("review_comment_message").alias("review_message"),
    first("review_creation_date").alias("review_creation_date"),
    first("review_answer_timestamp").alias("review_answer_timestamp")
)

In [0]:
#order_reviews_df.cache()
#order_reviews_df.count()

In [0]:
geolocation_df = geolocation_df.dropDuplicates(["geolocation_lat", "geolocation_lng"])

In [0]:
orders_items = orders_df.join(order_items_df, on=orders_df.order_id == order_items_df.item_order_id, how='inner')
#orders_items.cache()

In [0]:
from pyspark.sql.functions import broadcast

orders_items_products = orders_items.join(broadcast(products_df), on=orders_items.item_product_id == products_df.product_id, how='left')

In [0]:
orders_items_products_sellers = orders_items_products.join(broadcast(sellers_df), on=orders_items_products.item_seller_id == sellers_df.seller_id, how='left')

In [0]:
orders_items_products_sellers_payments = orders_items_products_sellers.join(order_payments_df, on=orders_items_products_sellers.order_id == order_payments_df.payment_order_id, how='left')

In [0]:
orders_items_products_sellers_payments_customers = orders_items_products_sellers_payments.join(broadcast(customers_df), on=orders_items_products_sellers_payments.order_customer_id == customers_df.customer_id, how='left')

In [0]:
orders_items_products_sellers_payments_customers_reviews = orders_items_products_sellers_payments_customers.join(order_reviews_df, on=orders_items_products_sellers_payments_customers.order_id == order_reviews_df.review_order_id, how='left')

In [0]:
orders_items_products_sellers_payments_customers_reviews_geolocation = orders_items_products_sellers_payments_customers_reviews.join(broadcast(geolocation_df), on=orders_items_products_sellers_payments_customers_reviews.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix, how='left')

In [0]:
final_df = orders_items_products_sellers_payments_customers_reviews_geolocation.join(broadcast(products_name_translation_df), on='product_category_name', how='left')

### **ADD EXTRA COLUMNS TO FINAL DF**

In [0]:
# Total Revenue & Avg order value (AOV) Per Customer

customer_spending = final_df.groupBy('order_customer_id') \
    .agg(
        count('order_id').alias('total_orders'),
            round(sum('price'),2).alias('total_spent'),
                round(avg('price'), 2).alias('AOV')
    ) \
        .orderBy(desc('total_spent'))

In [0]:
quantiles = customer_spending.approxQuantile('AOV', [0.25, 0.75], 0.0)
low_threshold, high_threshold = quantiles[0], quantiles[1]

customer_segmented = customer_spending.withColumn('aov_segment',
    when(col('AOV') >= high_threshold, 'High')
    .when(col('AOV') < low_threshold, 'Low')
    .otherwise('Medium')
)

In [0]:
customer_segmented = customer_segmented.withColumnRenamed("order_customer_id", "order_customer_id_customer_segmented")

In [0]:
# AOV Customer Segment
final_df = final_df.join(broadcast(customer_segmented), on=final_df.order_customer_id == customer_segmented.order_customer_id_customer_segmented, how='left')

In [0]:
def add_feature_columns(final_df):

    # Delivery Status
    final_df = final_df.withColumn('is_delivered', 
                                   when(col('order_status') == 'delivered', lit(1)).otherwise(lit(0))) \
                        .withColumn('is_canceled', 
                                    when(col('order_status') == 'canceled', lit(1)).otherwise(lit(0)))

    # Total Revenue
    final_df = final_df.withColumn('order_revenue', col('price') + col('freight_value'))

    # Missing Payment
    final_df = final_df.withColumn('is_payment_missing', when(col('payment_value').isNull(), 1).otherwise(0))

    # Order Review
    final_df = final_df.withColumn('has_review', \
        when(
        (array_contains(col('review_titles'), 'No Review')) & (array_contains(col('review_message'), 'No Review')), 'No Reivew')
        .otherwise('Has Comment'))

    # Review Time Delays
    final_df = final_df.withColumn('time_from_delivery_to_review', \
        datediff(col('review_creation_date'), col('order_delivered_customer_date'))) \
        .withColumn('time_to_response', \
            datediff(col('review_answer_timestamp'), col('review_creation_date')))

    # Hourly Order Distribution
    final_df = final_df.withColumn('hour_of_day', expr('hour(order_purchase_timestamp)'))

    # WeekDay VS WeekEnd Orders
    final_df = final_df.withColumn('order_day_type', \
        when(dayofweek('order_purchase_timestamp').isin([1,7]),lit('Weekend')).otherwise(lit('Weekday')))

    # Calculate Delivery Time & Time Delays
    final_df = final_df.withColumn('actual_delivery_time', \
        when(col('order_delivered_customer_date').isNotNull(), \
        datediff('order_delivered_customer_date', 'order_purchase_timestamp'))
        .otherwise(None)
    ) \
    .withColumn('estimated_delivery_time', \
        when(col('order_estimated_delivery_date').isNotNull(),
        datediff('order_estimated_delivery_date', 'order_purchase_timestamp'))
        .otherwise(None)
    ) \
    .withColumn('delivery_delay_time', \
        when(col('actual_delivery_time').isNotNull() & col('estimated_delivery_time').isNotNull(),
        col('actual_delivery_time') - col('estimated_delivery_time'))
        .otherwise(None)
    ) \
    .withColumn('is_late', \
        when((col('delivery_delay_time').isNotNull()) & (col('delivery_delay_time') > 0), 1)
        .otherwise(0)
    )


    return final_df

In [0]:
final_df = add_feature_columns(final_df)

In [0]:
# Partition based on the order_year_month
final_df = final_df.withColumn("order_year_month", date_format("order_purchase_timestamp", "yyyy-MM"))

### **CACHE THE FINAL DATAFRAME**

In [0]:
#final_df.cache()

In [0]:
#final_df.count()

In [0]:
#from pyspark import StorageLevel

#final_df = final_df.repartition(16, 'order_id').persist(StorageLevel.MEMORY_AND_DISK)
#final_df.count()  # or better: display(final_df)

### **SAVE FINAL DF DATA TO DELTA FILE FORMAT**

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS olist.silver.final_df

In [0]:
final_df.write.format('delta') \
    .mode('overwrite') \
    .option('mergeSchema', 'true') \
    .partitionBy('order_year_month') \
    .save('/Volumes/olist/silver/final_df')

### **CREATE TABLE ON FINAL DATAFRAME**

In [0]:
final_df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('mergeSchema', 'true') \
    .partitionBy('order_year_month') \
    .saveAsTable('olist.gold.final_df')