In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('Olist')\
.getOrCreate()

In [6]:
#path for our data
hdfs_path = '/data/olist/'

In [7]:
customer_df = spark.read.csv(hdfs_path+'olist_customers_dataset.csv', header = True, inferSchema =True)
geolocation_df = spark.read.csv(hdfs_path+'olist_geolocation_dataset.csv', header = True, inferSchema =True)
order_items_df = spark.read.csv(hdfs_path+'olist_order_items_dataset.csv', header = True, inferSchema =True)
order_payments_df = spark.read.csv(hdfs_path + 'olist_order_payments_dataset.csv' ,header = True, inferSchema = True)
order_reviews_df = spark.read.csv(hdfs_path+'olist_order_reviews_dataset.csv', header = True, inferSchema =True)
orders_df = spark.read.csv(hdfs_path+'olist_orders_dataset.csv', header = True, inferSchema =True)
products_df = spark.read.csv(hdfs_path+'olist_products_dataset.csv', header = True, inferSchema =True)
sellers_df = spark.read.csv(hdfs_path+'olist_sellers_dataset.csv', header = True, inferSchema =True)
category_name_df = spark.read.csv(hdfs_path+'product_category_name_translation.csv', header = True, inferSchema =True)

                                                                                

In [8]:
#caching frequenlty used data for better performance
orders_df.cache()
customer_df.cache()
order_items_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [12]:
from pyspark.sql.functions import broadcast

In [9]:
order_items_joined_df = orders_df.join(order_items_df,'order_id','inner')

In [10]:
order_items_products_df = order_items_joined_df.join(products_df,'product_id', 'inner')

In [13]:
order_items_products_sellers_df = order_items_products_df.join(broadcast(sellers_df), 'seller_id','inner')

In [14]:
#this is the major table we have formed next extra data i can add by left join not inner
full_orders_df = order_items_products_sellers_df.join(customer_df,'customer_id','inner')

In [15]:
#Adding geolocation data

full_orders_df = full_orders_df.join(broadcast(geolocation_df),full_orders_df.customer_zip_code_prefix == geolocation_df.geolocation_zip_code_prefix,'left')

In [16]:
#adding reviews

full_orders_df = full_orders_df.join(order_reviews_df,'order_id','left')

In [17]:
#adding payment details

full_orders_df = full_orders_df.join(order_payments_df,'order_id','left')

In [18]:
full_orders_df.cache()

25/08/28 15:45:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

Now we got a table containing all the data so we can perfrom task upon this

#### Q1. Total revenue per seller

In [19]:
from pyspark.sql.functions import *

In [20]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [21]:
seller_revenue_df = full_orders_df.groupBy('seller_id').agg(sum('price').alias('total_revenue'))

In [22]:
seller_revenue_df.show()



+--------------------+--------------------+
|           seller_id|       total_revenue|
+--------------------+--------------------+
|8e6cc767478edae94...|  1145757.4000000567|
|4d600e08ecbe08258...|   436434.2299999993|
|cb5ff1b9715e99589...|             13235.0|
|62de60d81c55c29d7...|              8343.0|
|acadd4d36859671cb...|            262034.0|
|33ab10be054370c25...|    45320.7000000006|
|b76dba6c951ab00dc...|   302582.6600000196|
|33cbbec1e7e1044aa...|   190508.3199999999|
|7a67c85e85bb2ce85...|2.0312794890029624E7|
|3d8fa2f5b647373c8...|  458520.11999999057|
|9d213f303afae4983...|   2321.400000000004|
|e5c84227854980f1d...|   3851.539999999998|
|ca77545ca4d2dfd14...|   76038.90000000007|
|ee2fbacc2fc3794e6...|  175773.57000000187|
|d2374cbcbb3ca4ab1...|   3375517.549997773|
|f1b854361f4e15d58...|   94283.69999999962|
|7357b52d27cbaa90f...|  417092.88000001025|
|26d8a1c7c75d51304...|   1525399.499999968|
|238fac594e170b59c...|            480044.0|
|f4a04f7be452aa3bb...|  208083.8

                                                                                

#### Q2.Total orders per customer

In [23]:
customer_order_count = full_orders_df.groupBy('customer_id')\
.agg(count('order_id').alias('total_orders'))\
.orderBy(desc('total_orders'))

In [24]:
customer_order_count.show(5)



+--------------------+------------+
|         customer_id|total_orders|
+--------------------+------------+
|351e40989da90e704...|       11427|
|50920f8cd0681fd86...|       10752|
|9b43e2a62de9bab3a...|        8556|
|270c23a11d024a44c...|        8001|
|5c87184371002d49e...|        6876|
+--------------------+------------+
only showing top 5 rows



                                                                                

#### Q3. Average review score per seller

In [25]:
avg_review = full_orders_df.groupBy('seller_id')\
.agg(avg('review_score').alias('avg_review_score'))\
.orderBy(desc('avg_review_score'))

In [26]:
avg_review.show(5)



+--------------------+----------------+
|           seller_id|avg_review_score|
+--------------------+----------------+
|31e60bf8d103ce479...|             5.0|
|1a8e2d9c38b84a970...|             5.0|
|2b2fed75b8e5ea3a0...|             5.0|
|33ab10be054370c25...|             5.0|
|fd312b6bf05efac6c...|             5.0|
+--------------------+----------------+
only showing top 5 rows



                                                                                

#### Q4. Most Sold products (Top 10)

In [27]:
top_prod = full_orders_df.groupBy('product_id')\
.agg(count('order_id').alias('number_of_sells'))\
.orderBy('number_of_sells')

In [28]:
top_prod.show(5)



+--------------------+---------------+
|          product_id|number_of_sells|
+--------------------+---------------+
|9da1c863c2389407f...|              1|
|0c3e039a06ef40bce...|              1|
|acd9f2e9edc814a31...|              1|
|c35614188d97c66c2...|              1|
|de7034010d180a33f...|              1|
+--------------------+---------------+
only showing top 5 rows



                                                                                

## optimizing joins for data integration

To optimize our joins we can use broadcasting for the smaller dataframes like geolocation, seller(only for our dataset)

## Windows function

#### Rank top selling products per seller

In [29]:
from pyspark.sql.window import Window

In [30]:
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [31]:
top_selling_prod_df = full_orders_df.withColumn('rank',rank().over(window_spec)).filter(col('rank')<=5)

In [32]:
top_selling_prod_df.select('seller_id','price','rank').show()

[Stage 45:>                                                         (0 + 1) / 1]

+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

## Advanced aggregation and enrichment

### Q1.Total revenue and average order value(AOV) per customer

In [33]:
customer_spending_df = full_orders_df.groupBy('customer_id')\
.agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_spent'),
    round(avg('price'),2).alias('AOV')
)\
.orderBy(desc('total_spent'))

In [34]:
customer_spending_df.show(5)



+--------------------+------------+-----------+------+
|         customer_id|total_orders|total_spent|   AOV|
+--------------------+------------+-----------+------+
|d3e82ccec3cb5f956...|        6876|  6662844.0| 969.0|
|df55c14d1476a9a34...|         743|  3565657.0|4799.0|
|fe5113a38e3575c04...|        2292|  3293604.0|1437.0|
|ec5b2ba62e5743423...|        1428|  2556120.0|1790.0|
|63b964e79dee32a35...|        6072|  2501664.0| 412.0|
+--------------------+------------+-----------+------+
only showing top 5 rows





#### Q2. seller performace metrics(Revenue, Average revenue, Order Count)

In [35]:
seller_performance = full_orders_df.groupBy('seller_id')\
.agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_revenue'),
    round(avg('review_score'),2).alias('avergage_review_score'),
    round(stddev('price'),2).alias('price_variability')
)\
.orderBy(desc('total_revenue'))

In [36]:
seller_performance.show(5)



+--------------------+------------+--------------------+---------------------+-----------------+
|           seller_id|total_orders|       total_revenue|avergage_review_score|price_variability|
+--------------------+------------+--------------------+---------------------+-----------------+
|4869f7a5dfa277a7d...|      184587|3.6138717319998816E7|                 4.09|           111.65|
|53243585a1d6dc264...|       54514| 3.429159295000016E7|                 4.12|           499.65|
|4a3ca9315b744ce9f...|      330661| 3.375957084003399E7|                 3.77|            59.37|
|7c67e1448b00f6e96...|      233306|3.2282321790014144E7|                 3.42|            50.39|
|fa1c13f2614d7b5c4...|       87686| 3.013938631000357E7|                 4.38|            307.7|
+--------------------+------------+--------------------+---------------------+-----------------+
only showing top 5 rows



                                                                                

#### Q3. Product popularity metrics

In [37]:
from pyspark.sql.functions import collect_set

In [38]:
produc_metrics = full_orders_df.groupBy('product_id')\
.agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('prod_revenue'),
    round(avg('price'),2).alias('avg_price'),
    round(stddev('price'),2).alias('price_volatility'),\
    collect_set('seller_id').alias('unique_sellers')
)\
.orderBy(desc('total_orders'))

In [39]:
produc_metrics.show(5)



+--------------------+------------+------------------+---------+----------------+--------------------+
|          product_id|total_orders|      prod_revenue|avg_price|price_volatility|      unique_sellers|
+--------------------+------------+------------------+---------+----------------+--------------------+
|aca2eb7d00ea1a7b8...|       86740| 6164630.299996043|    71.07|            3.17|[955fee9216a65b61...|
|422879e10f4668299...|       81110| 4442791.509997541|    54.77|            4.46|[1f50f920176fa81d...|
|99a4788cb24856965...|       78775|6921762.7099960325|    87.87|            4.08|[4a3ca9315b744ce9...|
|389d119b48cf3043d...|       60248|3280533.1299988558|    54.45|            4.37|[1f50f920176fa81d...|
|d1c427060a0f73f6b...|       59274| 8220103.330002677|   138.68|           16.58|[a1043bafd471dff5...|
+--------------------+------------+------------------+---------+----------------+--------------------+
only showing top 5 rows



                                                                                

#### Q. Customer retention analysis(first and last order)

In [40]:
customer_retention_df = full_orders_df.groupBy('customer_id')\
.agg(
    first('order_purchase_timestamp').alias('first_order_date'),
    last('order_purchase_timestamp').alias('last_order_date'),
    count('order_id').alias('total_orders'),
    round(avg('price'),2).alias('aov')
)\
.orderBy(desc('total_orders'))

In [41]:
customer_retention_df.show(5)



+--------------------+-------------------+-------------------+------------+-----+
|         customer_id|   first_order_date|    last_order_date|total_orders|  aov|
+--------------------+-------------------+-------------------+------------+-----+
|351e40989da90e704...|2017-07-13 10:42:37|2017-07-13 10:42:37|       11427|85.99|
|50920f8cd0681fd86...|2018-01-27 11:28:32|2018-01-27 11:28:32|       10752|43.82|
|9b43e2a62de9bab3a...|2017-05-25 22:27:50|2017-05-25 22:27:50|        8556| 26.4|
|270c23a11d024a44c...|2017-08-08 20:26:31|2017-08-08 20:26:31|        8001|36.59|
|5c87184371002d49e...|2018-01-05 19:15:37|2018-01-05 19:15:37|        6876|12.49|
+--------------------+-------------------+-------------------+------------+-----+
only showing top 5 rows



                                                                                

# Extended enrichment

In [43]:
#Order status flags
full_orders_df = full_orders_df.withColumn('is_delivered', when(col('order_status')=='delivered',lit(1)).otherwise(lit(0)))\
.withColumn('is_canceled', when(col('order_status')=='canceled',lit(1)).otherwise(lit(0)))

In [45]:
full_orders_df.select('order_status','is_delivered','is_canceled').show()

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
|   delivered|           1|          0|
+------------+------------+-----------+
only showing top 20 rows



In [46]:
full_orders_df.where(full_orders_df['order_status']=='canceled').select('order_status','is_delivered','is_canceled').show(5)

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
|    canceled|           0|          1|
+------------+------------+-----------+
only showing top 5 rows



In [47]:
#order revenue calculation
full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+col('freight_value'))

In [48]:
full_orders_df.select('price','freight_value','order_revenue').show()

+-----+-------------+-------------+
|price|freight_value|order_revenue|
+-----+-------------+-------------+
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
|29.99|         8.72|        38.71|
+-----+-------------+-------------+
only showing top 20 rows



In [49]:
customer_spending_df.show(5)



+--------------------+------------+-----------+------+
|         customer_id|total_orders|total_spent|   AOV|
+--------------------+------------+-----------+------+
|d3e82ccec3cb5f956...|        6876|  6662844.0| 969.0|
|df55c14d1476a9a34...|         743|  3565657.0|4799.0|
|fe5113a38e3575c04...|        2292|  3293604.0|1437.0|
|ec5b2ba62e5743423...|        1428|  2556120.0|1790.0|
|63b964e79dee32a35...|        6072|  2501664.0| 412.0|
+--------------------+------------+-----------+------+
only showing top 5 rows



                                                                                

In [50]:
#customer segmentaion based on spending
customer_spending_df = customer_spending_df.withColumn(
    'customer_segment',
    when((col('AOV')<1200) & (col('AOV')>=500), 'Medium_value')
    .otherwise('low_value')
)

In [51]:
customer_spending_df.show(5)



+--------------------+------------+-----------+------+----------------+
|         customer_id|total_orders|total_spent|   AOV|customer_segment|
+--------------------+------------+-----------+------+----------------+
|d3e82ccec3cb5f956...|        6876|  6662844.0| 969.0|    Medium_value|
|df55c14d1476a9a34...|         743|  3565657.0|4799.0|       low_value|
|fe5113a38e3575c04...|        2292|  3293604.0|1437.0|       low_value|
|ec5b2ba62e5743423...|        1428|  2556120.0|1790.0|       low_value|
|63b964e79dee32a35...|        6072|  2501664.0| 412.0|       low_value|
+--------------------+------------+-----------+------+----------------+
only showing top 5 rows



                                                                                

In [52]:
full_orders_df = full_orders_df.join(customer_spending_df.select('customer_id','customer_segment'),'customer_id',how='left')

In [53]:
full_orders_df.select('customer_id','customer_segment').show(5)

                                                                                

+--------------------+----------------+
|         customer_id|customer_segment|
+--------------------+----------------+
|9ef432eb625129730...|       low_value|
|9ef432eb625129730...|       low_value|
|9ef432eb625129730...|       low_value|
|9ef432eb625129730...|       low_value|
|9ef432eb625129730...|       low_value|
+--------------------+----------------+
only showing top 5 rows



## Hourly order distribution

In [54]:
full_orders_df.select('order_purchase_timestamp').show()

+------------------------+
|order_purchase_timestamp|
+------------------------+
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
|     2017-10-02 10:56:33|
+------------------------+
only showing top 20 rows



In [55]:
full_orders_df = full_orders_df.withColumn('hour_of_day',expr('hour(order_purchase_timestamp)'))

In [56]:
full_orders_df.select('order_purchase_timestamp','hour_of_day').show()

+------------------------+-----------+
|order_purchase_timestamp|hour_of_day|
+------------------------+-----------+
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
|     2017-10-02 10:56:33|         10|
+------------------------+-----------+
only showing top 20 rows



In [58]:
# weekday vs weekend orders
full_orders_df = full_orders_df.withColumn('order_day_type',\
                                           when(dayofweek('order_purchase_timestamp').isin(1,7),lit('weekend')).otherwise(lit('weekday')))

In [61]:
full_orders_df.select('order_purchase_timestamp','order_day_type').show(5)

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2017-10-02 10:56:33|       weekday|
|     2017-10-02 10:56:33|       weekday|
|     2017-10-02 10:56:33|       weekday|
|     2017-10-02 10:56:33|       weekday|
|     2017-10-02 10:56:33|       weekday|
+------------------------+--------------+
only showing top 5 rows



In [63]:
full_orders_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null