**Data Integration and Optimization**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.appName("e-comm data integration and optimization")\
.getOrCreate()

25/06/18 13:28:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
spark

In [3]:
hdfs_path = "/data/olist/"

In [4]:
customers_df = spark.read.csv(hdfs_path + "olist_customers_dataset.csv", header = True, inferSchema= True)

                                                                                

In [5]:
product_category_df = spark.read.csv(hdfs_path + "product_category_name_translation.csv", header = True, inferSchema= True)
geolocation_df = spark.read.csv(hdfs_path + "olist_geolocation_dataset.csv", header = True, inferSchema= True)
order_items_df = spark.read.csv(hdfs_path + "olist_order_items_dataset.csv", header = True, inferSchema= True)
payments_df = spark.read.csv(hdfs_path + "olist_order_payments_dataset.csv", header = True, inferSchema= True)
reviews_df = spark.read.csv(hdfs_path + "olist_order_reviews_dataset.csv", header = True, inferSchema= True)
orders_df = spark.read.csv(hdfs_path + "olist_orders_dataset.csv", header = True, inferSchema= True)
sellers_df = spark.read.csv(hdfs_path + "olist_sellers_dataset.csv", header = True, inferSchema= True)
products_df = spark.read.csv(hdfs_path + "olist_products_dataset.csv", header = True, inferSchema= True)

                                                                                

In [6]:
orders_df.cache()
customers_df.cache()
order_items_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [7]:
order_items_joined_df = orders_df.join(order_items_df,"order_id","inner")

In [8]:
order_items_products_df = order_items_joined_df.join(products_df,"product_id","inner") 

In [9]:
orders_items_product_seller_df = order_items_products_df.join(sellers_df,"seller_id","inner")

In [10]:
full_orders_df = orders_items_product_seller_df.join(customers_df,"customer_id","inner")

In [11]:
full_orders_df.show(5)

25/06/18 13:29:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-------------+------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|           seller_id|          product_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|shipping_limit_date|price|freight_value|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height

                                                                                

In [12]:
# Geolocation Data

full_orders_df = full_orders_df.join(geolocation_df,full_orders_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix,"left")

In [13]:
full_orders_df = full_orders_df.join(reviews_df,"order_id","left")

In [14]:
full_orders_df =full_orders_df.join(payments_df,"order_id","left")

In [15]:
full_orders_df.cache()

DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [16]:
full_orders_df.show(5)



+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+-------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_approved

                                                                                

In [17]:
# Total Revenue per seller

from pyspark.sql.functions import *

seller_revenue_df = full_orders_df.groupBy("seller_id").agg(sum("price").alias("Total Revenue"))

In [18]:
seller_revenue_df.show(5)



+--------------------+------------------+
|           seller_id|     Total Revenue|
+--------------------+------------------+
|d650b663c3b5f6fb3...|         2253869.1|
|cd06602b43d8800bd...|353150.98000000033|
|3c487ae8f8d7542be...|1618845.7000000055|
|d354c38a7182125a7...|318455.87000000104|
|e9779976487b77c6d...| 6293200.690000011|
+--------------------+------------------+
only showing top 5 rows



                                                                                

In [19]:
# Total Orders Per Customer

Total_orders_per_customer = full_orders_df.groupBy("customer_id").agg(count("order_id").alias("total_orders"))



In [20]:
Total_orders_per_customer.show(1)



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|f35e5fd801be940cb...|          65|
+--------------------+------------+
only showing top 1 row



                                                                                

In [21]:
# Average Review score per seller

average_review_score_per_seller = full_orders_df.groupBy("seller_id").agg(avg("review_score").alias("seller average score"))


In [22]:
average_review_score_per_seller.show(1)



+--------------------+--------------------+
|           seller_id|seller average score|
+--------------------+--------------------+
|7a67c85e85bb2ce85...|   4.258920734844587|
+--------------------+--------------------+
only showing top 1 row



                                                                                

In [23]:
# top customer by spending

top_customer_by_spending = full_orders_df.groupBy("customer_id").agg(sum("price").alias("Total Sales")).orderBy("Total Sales",ascending= False)



In [24]:
top_customer_by_spending.show(5)



+--------------------+-----------+
|         customer_id|Total Sales|
+--------------------+-----------+
|d3e82ccec3cb5f956...|  6662844.0|
|df55c14d1476a9a34...|  3565657.0|
|fe5113a38e3575c04...|  3293604.0|
|ec5b2ba62e5743423...|  2556120.0|
|63b964e79dee32a35...|  2501664.0|
+--------------------+-----------+
only showing top 5 rows



                                                                                

In [25]:
full_orders_df.show(1)

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_approved_

In [26]:
orders_items_product_seller_df = order_items_products_df.join(broadcast(sellers_df),"seller_id","inner")

In [27]:
#Top 10 Most sold products

top_10_most_sold_products = full_orders_df.groupBy("product_id").agg(count("order_id").alias("Top_10_sold_products")).orderBy("Top_10_sold_products",ascending = False).limit(10)

In [28]:
top_10_most_sold_products.show(10)



+--------------------+--------------------+
|          product_id|Top_10_sold_products|
+--------------------+--------------------+
|aca2eb7d00ea1a7b8...|               86740|
|422879e10f4668299...|               81110|
|99a4788cb24856965...|               78775|
|389d119b48cf3043d...|               60248|
|d1c427060a0f73f6b...|               59274|
|368c6c730842d7801...|               58358|
|53759a2ecddad2bb8...|               52654|
|53b36df67ebb7c415...|               52105|
|154e7e31ebfa09220...|               42700|
|3dd2a17168ec895c7...|               40787|
+--------------------+--------------------+



                                                                                

**window function and Ranking**

In [29]:
from pyspark.sql.window import Window

In [30]:

#Rank Top Selling Products per Seller

window_spec = Window.partitionBy("seller_id").orderBy(desc("price"))

In [31]:

top_seller_products_df = full_orders_df.withColumn("rank",rank().over(window_spec)).filter(col("rank")<=5)

In [32]:
top_seller_products_df.select("seller_id","price","rank").show(5)

[Stage 84:>                                                         (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 5 rows



                                                                                

***Extended Enrichment***

**Advance Aggregation and Enrichment**

In [33]:
# Total Revenue & Average order value per customer

customer_spending_df = full_orders_df.groupBy("customer_id")\
.agg(count("order_id").alias("total_orders"),
   sum("price").alias("total_spent"),
    round(avg("price"),2).alias("ADV")
)\
.orderBy(desc("total_spent"))

In [34]:
customer_spending_df.show(5)



+--------------------+------------+-----------+------+
|         customer_id|total_orders|total_spent|   ADV|
+--------------------+------------+-----------+------+
|d3e82ccec3cb5f956...|        6876|  6662844.0| 969.0|
|df55c14d1476a9a34...|         743|  3565657.0|4799.0|
|fe5113a38e3575c04...|        2292|  3293604.0|1437.0|
|ec5b2ba62e5743423...|        1428|  2556120.0|1790.0|
|63b964e79dee32a35...|        6072|  2501664.0| 412.0|
+--------------------+------------+-----------+------+
only showing top 5 rows



                                                                                

In [35]:
# Seller Performance Matrix (revenue, average review, order count)

seller_performance_df = full_orders_df.groupBy("seller_id")\
.agg(count("order_id").alias("total_orders"),
    sum("price").alias("total_revenue"),
    round(avg("review_score"),2).alias("avg_review_score"),
    round(stddev("price"),2).alias("price_variability")
                        )\
                        .orderBy(desc("total_revenue"))

In [36]:
seller_performance_df.show(5)



+--------------------+------------+--------------------+----------------+-----------------+
|           seller_id|total_orders|       total_revenue|avg_review_score|price_variability|
+--------------------+------------+--------------------+----------------+-----------------+
|4869f7a5dfa277a7d...|      184587| 3.613871731999996E7|            4.09|           111.65|
|53243585a1d6dc264...|       54514| 3.429159294999997E7|            4.12|           499.65|
|4a3ca9315b744ce9f...|      330661| 3.375957084000012E7|            3.77|            59.37|
|7c67e1448b00f6e96...|      233306|3.2282321789999783E7|            3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686|3.0139386309999984E7|            4.38|            307.7|
+--------------------+------------+--------------------+----------------+-----------------+
only showing top 5 rows



                                                                                

In [37]:
# Product Popularity Metrix

product_metrix_df = full_orders_df.groupBy("product_id")\
.agg(
    count("order_id").alias("total_sales"),
    sum("price").alias("total_revenue"),
    round(avg("price"),2).alias("avg_price"),
    round(stddev("price"),2).alias("seller_volatility"),\
    collect_set("seller_id").alias("unique_seller")    
)\
.orderBy(desc("total_sales"))


In [38]:
product_metrix_df.show(5)



+--------------------+-----------+------------------+---------+-----------------+--------------------+
|          product_id|total_sales|     total_revenue|avg_price|seller_volatility|       unique_seller|
+--------------------+-----------+------------------+---------+-----------------+--------------------+
|aca2eb7d00ea1a7b8...|      86740| 6164630.300000013|    71.07|             3.17|[955fee9216a65b61...|
|422879e10f4668299...|      81110| 4442791.510000016|    54.77|             4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|      78775|6921762.7100000195|    87.87|             4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|      60248|  3280533.13000001|    54.45|             4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|      59274| 8220103.329999991|   138.68|            16.58|[a1043bafd471dff5...|
+--------------------+-----------+------------------+---------+-----------------+--------------------+
only showing top 5 rows



                                                                                

In [39]:
# Customer Retention analysis (first and last order)

customer_retention_df = full_orders_df.groupBy("customer_id")\
.agg(
    first("order_purchase_timestamp").alias("first_order_date"),
    last("order_purchase_timestamp").alias("last_order_date"),
    count("order_id").alias("total_orders"),
    round(avg("price"),2).alias("aov")
)\
.orderBy(desc("total_orders"))

In [40]:
customer_retention_df.show(5)



+--------------------+-------------------+-------------------+------------+-----+
|         customer_id|   first_order_date|    last_order_date|total_orders|  aov|
+--------------------+-------------------+-------------------+------------+-----+
|351e40989da90e704...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427|85.99|
|50920f8cd0681fd86...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752|43.82|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556| 26.4|
|270c23a11d024a44c...|2017-08-08 20:26:31|2017-08-08 20:26:31|        8001|36.59|
|5c87184371002d49e...|2018-01-05 19:15:37|2018-01-05 19:15:37|        6876|12.49|
+--------------------+-------------------+-------------------+------------+-----+
only showing top 5 rows



                                                                                

In [41]:
#Order Status Flags

full_orders_df = full_orders_df.withColumn("is_delivered",when(col("order_status")=="delivered",lit(1)).otherwise(lit(0)))\
.withColumn("is_canceled",when(col("order_status")=="canceled",lit(1)).otherwise(lit(0)))

In [42]:
full_orders_df.show(5)

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+-----------+------------+--------------------+------------------------+-------------+--------------+---------------------------+-------------------+-------------------+----------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+------------+-----------+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_t

In [43]:
full_orders_df.select("order_status","is_delivered","is_canceled").show(5)

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
+------------+------------+-----------+
only showing top 5 rows



In [44]:
# Order Revenue Calculation

full_orders_df = full_orders_df.withColumn("order_revenue",col("price")+col("freight_value")) 

In [45]:
full_orders_df.select("order_revenue","price","freight_value").show(5)

+------------------+-----+-------------+
|     order_revenue|price|freight_value|
+------------------+-----+-------------+
|36.449999999999996|28.99|         7.46|
|36.449999999999996|28.99|         7.46|
|36.449999999999996|28.99|         7.46|
|36.449999999999996|28.99|         7.46|
|36.449999999999996|28.99|         7.46|
+------------------+-----+-------------+
only showing top 5 rows



**Data Saving Layer**

In [49]:
!hadoop fs -mkdir /olist/

In [51]:
!hadoop fs -mkdir /olist/processed/

In [52]:
!hadoop fs -ls

Found 1 items
drwxr-xr-x   - root hadoop          0 2025-06-18 13:28 .sparkStaging


In [53]:
# Savee as Parquet in Hadoop

full_orders_df.write.mode("overwrite").parquet("/olist/processed/")

                                                                                

In [54]:
# Save as parquet in google cloud storage

full_orders_df.write.mode("overwrite").parquet("gs://ecom_analysis/temp_data")

                                                                                

In [56]:
# Saving data as table

full_orders_df.write.mode("overwrite").saveAsTable("full_orders_df")

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
25/06/18 13:54:09 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [57]:
spark.sql("show tables")

DataFrame[namespace: string, tableName: string, isTemporary: boolean]

In [58]:
# Saving as csv

full_orders_df.write.mode("overwrite").option("header","true").csv("/olist/processed/")

                                                                                