### Importing Libraries

In [1]:
from Projects.OLIST_SALES.config.config import get_environment_info,get_today_date
from pyspark.sql.functions import col,dense_rank,countDistinct,sum,round,to_timestamp,avg,when,count,lit
from pyspark.sql.window import Window

### Read Config File

In [2]:
storage_info = get_environment_info()
source_name = "olist"
storage_account = storage_info[0]
landing_container = storage_info[1]
raw_container = storage_info[2]
load_date = get_today_date()
print(load_date)

2024-12-07


#### Create and Use Consumption Schema

In [None]:
# Step 1: Create or use the existing database
spark.sql("CREATE DATABASE IF NOT EXISTS olist_cons")

# Step 2: Set the database context (optional, but good practice)
spark.sql("USE olist_cons")


In [None]:
customer_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/customer/")
orders_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/orders/")
orders_items_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/orders_items/")
products_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/products/")
products_english_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/products_english/")
orders_payment_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/orders_payment/")
orders_review_df = spark.read.csv(path=f"abfss://{raw_container}@{storage_account}.dfs.core.windows.net/null_or_not_null/{load_date}/orders_review/")



### Sales Revenue Breakdown

 #### Calculate the total sales revenue generated by each product category over the past year.

In [None]:
product_info_df = products_df\
    .join(products_english_df,on = products_df.product_category_name == products_english_df.product_category_name)\
    .select(products_df.product_id,products_english_df.product_category_name_english)

orders_cost_info_df = orders_items_df\
    .join(orders_payment_df,on = orders_items_df.order_id == orders_payment_df.order_id)\
    .select(orders_items_df.product_id,orders_payment_df.payment_value)

total_sales_revenue_product_cat_df = product_info_df\
    .join(orders_cost_info_df,on=product_info_df.product_id == orders_cost_info_df.product_id)\
    .select(product_info_df.product_category_name_english,orders_cost_info_df.payment_value)\
    .groupby(col("product_category_name_english").alias("product_category_name"))\
    .agg(round(sum("payment_value"),2).alias("total_sales_revenue"))

total_sales_revenue_product_cat_df.write.mode("overwrite").saveAsTable("olist_cons.total_sales_revenue_product_cat")

### Order Delivery Performance

#### Compare the average delivery time (order_approved_at to order_delivered_customer_date) for each city.

In [None]:
avg_delivery_time_city_df = orders_df\
    .join(customer_df, on = orders_df.customer_id == customer_df.customer_id)\
    .select(customer_df.customer_city,orders_df.order_approved_at,orders_df.order_delivered_customer_date)\
    .withColumn('order_approved_at', to_timestamp('order_approved_at', 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('order_delivered_customer_date', to_timestamp('order_delivered_customer_date', 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn("time_diff",col("order_delivered_customer_date").cast("long")-col("order_approved_at").cast("long"))\
    .withColumn('DiffInHours',round(col('time_diff')/3600))\
    .groupby(col("customer_city").alias("city"))\
    .agg(round(avg("DiffInHours"),2).alias("avg_time (Hours)"))\
    .withColumn("avg_time (Days)",round(col("avg_time (Hours)")/24,0))

avg_delivery_time_city_df.write.mode("overwrite").saveAsTable("olist_cons.avg_delivery_time_city")

#### Identify orders where the actual delivery date exceeded the estimated delivery date and calculate the percentage of late deliveries.

In [None]:
late_orders_df = orders_df\
    .join(customer_df, on = orders_df.customer_id == customer_df.customer_id)\
    .select(customer_df.customer_city,orders_df.order_estimated_delivery_date,orders_df.order_delivered_customer_date)\
    .withColumn('order_estimated_delivery_date', to_timestamp('order_estimated_delivery_date', 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('order_delivered_customer_date', to_timestamp('order_delivered_customer_date', 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn("late_delivery",when(col("order_delivered_customer_date")>col("order_estimated_delivery_date"),'Yes').otherwise("No"))\
    .groupBy("customer_city")\
    .agg(count("*").alias("total_orders_count"),
         count(when(col("late_delivery") == "Yes", 1)).alias("late_delivery_count"))\
    .withColumn("percentage (%)",round((col("late_delivery_count")/col("total_orders_count"))*100,0))
late_orders_df.write.mode("overwrite").saveAsTable("olist_cons.late_orders")


### Payment Method Analysis

#### Find out which payment methods (credit card, debit card, etc.) are the most popular among customers.

In [None]:
payment_method_df = orders_payment_df\
    .groupby(col("payment_type"))\
    .agg(count("*").alias("count"))

payment_method_df.write.mode("overwrite").saveAsTable("olist_cons.payment_type")

### Customer Lifetime Value (CLV)

#### Calculate the total spending of each unique customer and categorize them into high-value, medium-value, and low-value customer segments.

In [None]:
customer_class_df = orders_df\
    .join(orders_payment_df,on = orders_df.order_id == orders_payment_df.order_id)\
    .select(orders_df.customer_id,orders_payment_df.payment_value)\
    .groupby(col("customer_id").alias("customer"))\
    .agg(sum("payment_value").alias("total_spending"))\
    .withColumn("customer_type",
                when(col("total_spending") < 1000, "Silver Class").\
                when(((col("total_spending") > 1000 ) & (col("total_spending") < 3000)) ,"Gold Class").\
                when(col("total_spending") > 3000, "Premium Class"))

customer_class_df\
    .groupby(col("customer_type"))\
    .count()
    

### Review Sentiment Analysis

#### Identify the products with the highest number of positive and negative reviews.

In [None]:
# count of reviews and average review score based on product
orders_items_df\
    .join(orders_review_df,on=orders_items_df.order_id == orders_review_df.order_id)\
    .select(orders_items_df.product_id,orders_review_df.review_id,orders_review_df.review_score)\
    .groupby(col("product_id"))\
    .agg(count("review_id").alias("count_of_reviews"),
         round(avg("review_score"),2).alias("average_review_score (/5)"))



best_order = orders_items_df\
    .join(orders_review_df,on=orders_items_df.order_id == orders_review_df.order_id)\
    .select(orders_items_df.product_id,orders_review_df.review_id,orders_review_df.review_score)\
    .filter(col("review_score")==5)\
    .groupby(col("product_id"))\
    .count()\
    .withColumn("drnk",dense_rank().over(Window.orderBy(col("count").desc())))\
    .filter(col("drnk")==1)\
    .withColumn("review",lit("Best Product- Maximum 5 Rating"))\
    .drop("drnk")


worst_order = orders_items_df\
    .join(orders_review_df,on=orders_items_df.order_id == orders_review_df.order_id)\
    .select(orders_items_df.product_id,orders_review_df.review_id,orders_review_df.review_score)\
    .filter(col("review_score")==1)\
    .groupby(col("product_id"))\
    .count()\
    .withColumn("drnk",dense_rank().over(Window.orderBy(col("count").desc())))\
    .filter(col("drnk")==1)\
    .withColumn("review",lit("Worst Product- Maximum 1 Rating"))\
    .drop("drnk")

best_worst_product_df = best_order\
    .union(worst_order)

#### Calculate the average review score for each product category and correlate it with sales performance.

In [None]:
orders_items_df\
    .join(products_df,on= orders_items_df.product_id == products_df.product_id)\
    .select(orders_items_df.product_id,orders_items_df.order_id,products_df.product_category_name)\
    .join(products_english_df,on = products_df.product_category_name == products_english_df.product_category_name)\
    .select(orders_items_df.product_id,orders_items_df.order_id,products_english_df.product_category_name_english)\
    .join(orders_review_df,on=orders_items_df.order_id == orders_review_df.order_id)\
    .select(orders_items_df.product_id,orders_items_df.order_id,products_english_df.product_category_name_english,orders_review_df.review_score)\
    .groupby(col("product_category_name_english").alias("product_catgory"))\
    .agg(count("order_id").alias("count_of_orders"),
         round(avg("review_score"),2).alias("avg_rating"))

### Inventory Management for Sellers

#### Analyze the number of orders and sales volume for each seller to identify which sellers are driving the most sales.

In [None]:
seller_orders_price_df = orders_items_df\
    .select(orders_items_df.order_id,orders_items_df.product_id,orders_items_df.price,orders_items_df.seller_id)\
    .groupby(col("seller_id").alias("seller_id"))\
    .agg(count("order_id").alias("total_orders"),
         round(sum("price"),2).alias("total_sales"))

seller_volume_df = orders_items_df\
    .select(orders_items_df.product_id,orders_items_df.seller_id)\
    .groupby(col("seller_id").alias("seller_id"))\
    .agg(countDistinct("product_id").alias("total_product_volumne"))

seller_orders_price_df\
    .join(seller_volume_df,on=seller_orders_price_df.seller_id == seller_volume_df.seller_id)\
    .select(seller_orders_price_df.seller_id,seller_orders_price_df.total_orders,seller_volume_df.total_product_volumne,seller_orders_price_df.total_sales)\
    .show()

#### Calculate the average time taken for each seller to fulfill an order and identify sellers with consistently late deliveries.

In [27]:
orders_items_df\
    .join(orders_df,on=orders_items_df.order_id == orders_df.order_id)\
    .select(orders_items_df.seller_id,orders_df.order_id,orders_items_df.shipping_limit_date,orders_df.order_delivered_carrier_date,orders_df.order_delivered_customer_date)\
    .withColumn("late_delivery_or_no_delivery",when(col("order_delivered_carrier_date")>col("shipping_limit_date"),'Yes').otherwise('OnTime'))\
    .groupby(col("seller_id").alias("seller"))\
    .agg(count(when(col("late_delivery_or_no_delivery")=='Yes',1)).alias("late_delivers"),
         count(when(col("late_delivery_or_no_delivery")=='OnTime',1)).alias("on_time_delivers"),
         count('*').alias("total_deliveries"))\
    .withColumn("percentage_of_late_delivers (%)",round((col("late_delivers")/col("total_deliveries"))*100,2))\
    .show()

[Stage 51:>                                                         (0 + 4) / 4]

+--------------------+-------------+----------------+----------------+-------------------------------+
|              seller|late_delivers|on_time_delivers|total_deliveries|percentage_of_late_delivers (%)|
+--------------------+-------------+----------------+----------------+-------------------------------+
|ff063b022a9a0aab9...|            6|              90|              96|                           6.25|
|8e6cc767478edae94...|           41|              73|             114|                          35.96|
|a49928bcdf77c55c6...|           59|              47|             106|                          55.66|
|da7039f29f90ce5b4...|            0|              18|              18|                            0.0|
|062ce95fa2ad4dfae...|           13|              47|              60|                          21.67|
|2009a095de2a2a416...|            0|              12|              12|                            0.0|
|0ea22c1cfbdc755f8...|            5|             264|             269|   

                                                                                