## Module 3 - Data Integration & Aggregation

1. Join datasets efficiently
2. Optimizing Joins
3. Aggregations
4. Caching and optimizing queries for performance

In [0]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
    .appName('OListData') \
    .getOrCreate()

In [0]:
customers_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_customers_dataset.csv", header=True, inferSchema=True)

orders_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_orders_dataset.csv", header=True, inferSchema=True)

geolocation_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_geolocation_dataset.csv", header=True, inferSchema=True)

order_items_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_order_items_dataset.csv", header=True, inferSchema=True)

order_payments_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_order_payments_dataset.csv", header=True, inferSchema=True)

order_reviews_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_order_reviews_dataset.csv", header=True, inferSchema=True)

products_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_products_dataset.csv", header=True, inferSchema=True)    

sellers_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/olist_sellers_dataset.csv", header=True        , inferSchema=True)

product_category_name_translation_df = spark.read.csv("/Volumes/workspace/default/data/brazilian_ecommerce/product_category_name_translation.csv", header=True , inferSchema=True)


In [0]:
orders_items_df = (
    orders_df
    .join(order_items_df, on="order_id", how="left")
)
orders_items_df.show(5)


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|949d5b44dbf5de918...|f88197465ea7920ad...|   delivered|     2017-11-18 19:28:06|2017-11-18 19:45:59|         2017-11-22 13:39:59|         

In [0]:
orders_items_products_df = (
    orders_items_df
    .join(
        products_df.select(
            "product_id",
            "product_category_name",
            "product_name_lenght",
            "product_description_lenght",
            "product_photos_qty",
            "product_weight_g",
            "product_length_cm",
            "product_height_cm",
            "product_width_cm"
        ),
        on="product_id",
        how="left"
    )
)
orders_items_products_df.show(5)


+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+--------------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|           seller_id|shipping_limit_date|price|freight_value|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+--------------------+--------------------+------------+------------------------+--------

In [0]:
orders_items_products_sellers_df = (
    orders_items_products_df
    .join(
        sellers_df.select(
            "seller_id",
            "seller_zip_code_prefix",
            "seller_city",
            "seller_state"
        ),
        on="seller_id",
        how="left"
    )
)
orders_items_products_sellers_df.show(5)


+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+---------------+------------+
|           seller_id|          product_id|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|shipping_limit_date|price|freight_value|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|seller_zip_code_prefix|    seller_city|seller_state|
+-----

In [0]:
full_orders_df = (
    orders_items_products_sellers_df
    .join(
        customers_df.select(
            "customer_id",
            "customer_zip_code_prefix",
            "customer_city",
            "customer_state"
        ),
        on="customer_id",
        how="left"
    )
)
full_orders_df.show(5)


+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+---------------+------------+------------------------+--------------------+--------------+
|         customer_id|           seller_id|          product_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|shipping_limit_date|price|freight_value|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_c

In [0]:
full_orders_df.printSchema()

geolocation_df.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [0]:
#deduplicate geolocation 
geo_unique_df = geolocation_df.dropDuplicates(["geolocation_zip_code_prefix"])


In [0]:
full_orders_df = full_orders_df.join(
    geo_unique_df,
    col("customer_zip_code_prefix") == col("geolocation_zip_code_prefix"),
    "left"
)
full_orders_df.show(5)


+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+---------------+------------+------------------------+--------------------+--------------+---------------------------+-------------------+-------------------+--------------------+-----------------+
|         customer_id|           seller_id|          product_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_item_id|shipping_limit_date|price|freight_value|product_category_name|product_name_lenght|product

In [0]:
full_orders_df = full_orders_df.join(order_reviews_df,'order_id','left')
full_orders_df = full_orders_df.join(order_payments_df,'order_id','left')

In [0]:
full_orders_df.show(7)

+--------------------+--------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+---------------+------------+------------------------+--------------------+--------------+---------------------------+-------------------+-------------------+--------------------+-----------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+------------------+------------+--------------------+-------------+
|            order_id|         customer_id|           seller_id|          product_id|order_status|order_purchase_timestamp|  order_approved_at|or

In [0]:
from pyspark.sql.functions import sum, col



seller_revenue_df = full_orders_df.groupBy('seller_id').agg(
    sum('price').alias('total_revenue')
)


### Optimized Joins for Data Integration

In [0]:
#Total order per customer

customer_order_count_df= full_orders_df.groupBy('customer_id').agg(
    count('order_id').alias('total_orders')) \
        .orderBy(desc('total_orders'))

In [0]:
# Average Review Score per seller

seller_review_df = full_orders_df.groupBy('seller_id')\
    .agg(avg('review_score').alias('avg_review_score')) \
        .orderBy(desc('avg_review_score'))

In [0]:
#Top 10 most sold products

top_products_df = full_orders_df.groupBy('product_id') \
    .agg(count('order_id').alias('avg_review_score')) \
        .orderBy(desc('avg_review_score')) \
            .limit(10)

In [0]:
# Top 10 customer by spending
top_customers_df = full_orders_df.groupBy('customer_id') \
    .agg(sum('price').alias('total_spending')) \
    .orderBy(desc('total_spending')) \
    .limit(10)


### Window Function and ranking

In [0]:
from pyspark.sql.window import Window



window_spec= Window.partitionBy('seller_id').orderBy(desc('price'))

In [0]:
#rank top selling products per seller

top_seller_products_df = full_orders_df.groupBy('seller_id','product_id') \
    .agg(sum('price').alias('total_revenue')) \
        .orderBy(desc('total_revenue')) \
            .limit(10)

In [0]:
from pyspark.sql.functions import dense_rank

seller_revenue_window = Window.orderBy(col('total_revenue').desc())

seller_revenue_ranked_df = seller_revenue_df.withColumn(
    'revenue_rank',
    dense_rank().over(seller_revenue_window)
)





### Advance Aggregation & enrichment


In [0]:
#Total revenue & average order value(ADV) per customer

customer_spending_df = full_orders_df.groupBy('customer_id')\
  .agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_spent'),
    round(avg('price'),2).alias('ADV')
  ) \
    .orderBy(desc('total_spent'))

In [0]:
#seller performance matrics(revenue ,Average, Review , order count)

seller_performance_df = full_orders_df.groupBy('seller_id')\
  .agg(
    count('order_id').alias('total_orders'),
    sum('price').alias('total_revenue'),
    round(avg('price'),2).alias('avg_review_score'),
    round(stddev('price'),2).alias('price_variability')
  ) \
    .orderBy(desc('total_revenue'))

In [0]:
#Product Popularity Metrics

product_metrics_df = full_orders_df.groupBy('product_id')\
  .agg(
    count('order_id').alias('total_sales'),
    sum('price').alias('total_revenue'),
    round(avg('price'),2).alias('avg_price'),
    round(stddev('price'),2).alias('price_variability'),
    collect_set('seller_id').alias('unique_sellers')
   ) \
    .orderBy(desc('total_revenue'))

In [0]:
from pyspark.sql.functions import year, month

monthly_metrics_df = full_orders_df.withColumn('order_year', year('order_purchase_timestamp')) \
    .withColumn('order_month', month('order_purchase_timestamp')) \
    .groupBy('order_year', 'order_month') \
    .agg(
        sum('price').alias('monthly_revenue'),
        count('order_id').alias('monthly_order_count')
    ) \
    .orderBy('order_year', 'order_month')

In [0]:
#customer Retention Analysis (First & Last order)

customer_retention_df = full_orders_df.groupBy('customer_id') \
    .agg(
        first('order_purchase_timestamp').alias('first_order'),
        last('order_purchase_timestamp').alias('last_order'),
        count('customer_id').alias('total_orders'),
        round(avg('price'),2).alias('ADV')
    ) \
    .orderBy(desc('total_orders'))

### Extended Enrichment

In [0]:
#order status flags

from pyspark.sql.functions import col, when, lit

full_orders_df = (
    full_orders_df
    .withColumn(
        'is_delivered',
        when(col('order_status') == 'delivered', lit(1)).otherwise(lit(0))
    )
    .withColumn(
        'is_canceled',
        when(col('order_status') == 'canceled', lit(1)).otherwise(lit(0))
    )
)

In [0]:
full_orders_df.where(full_orders_df['order_status'] != 'delivered').select('order_status','is_delivered','is_canceled').show()

+------------+------------+-----------+
|order_status|is_delivered|is_canceled|
+------------+------------+-----------+
|  processing|           0|          0|
|    canceled|           0|          1|
|     shipped|           0|          0|
|     shipped|           0|          0|
|     shipped|           0|          0|
|     shipped|           0|          0|
|    invoiced|           0|          0|
|     shipped|           0|          0|
| unavailable|           0|          0|
|     shipped|           0|          0|
|    invoiced|           0|          0|
|     shipped|           0|          0|
|     shipped|           0|          0|
|     shipped|           0|          0|
|  processing|           0|          0|
|     shipped|           0|          0|
|     shipped|           0|          0|
|     shipped|           0|          0|
| unavailable|           0|          0|
|    invoiced|           0|          0|
+------------+------------+-----------+
only showing top 20 rows


In [0]:
#order reveunue Calculation

full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+ col('freight_value'))

In [0]:
full_orders_df.select('price','freight_value','order_revenue').show()

+------+-------------+------------------+
| price|freight_value|     order_revenue|
+------+-------------+------------------+
|  99.0|        30.53|            129.53|
|1299.0|        77.45|           1376.45|
|  19.9|        16.05|             35.95|
|  45.0|         27.2|              72.2|
| 29.99|         8.72|             38.71|
|  19.9|         8.72|28.619999999999997|
|149.99|        19.77|169.76000000000002|
| 159.9|        19.22|            179.12|
|  49.9|        16.05|             65.95|
| 38.25|        16.11|             54.36|
| 132.4|        14.05|146.45000000000002|
| 147.9|        27.36|            175.26|
|  17.9|        11.85|             29.75|
|  19.9|         12.8|              32.7|
|  76.0|        16.97|             92.97|
|  31.9|        18.23|50.129999999999995|
| 109.9|         8.96|118.86000000000001|
| 118.7|        22.76|            141.46|
| 59.99|        15.17|             75.16|
| 27.99|         15.1|43.089999999999996|
+------+-------------+------------

In [0]:
from pyspark.sql.functions import when, col

customer_spending_df = customer_spending_df.withColumn(
    'customer_segment',
    when(col('ADV') >= 1200, "High-Value")
    .when((col('ADV') < 1200) & (col('ADV') >= 500), "Medium-Value")
    .otherwise("Low-Value")
)

display(customer_spending_df)

customer_id,total_orders,total_spent,ADV,customer_segment
1617b1357756262bfa56ab541c47bc16,8,13440.0,1680.0,High-Value
9af2372a1e49340278e7c1ef8d749f34,29,11383.949999999995,392.55,Low-Value
de832e8dbb1f588a47013e53feaa67cc,15,10856.1,723.74,Medium-Value
63b964e79dee32a3587651701a2b8dbf,24,9888.0,412.0,Low-Value
6f241d5bbb142b6f764387c8c270645a,7,9520.14,1360.02,High-Value
926b6a6fb8b6081e00b335edaf578d35,2,7998.0,3999.0,High-Value
f959b7bc834045511217e6410985963f,6,7799.4,1299.9,High-Value
eb7a157e8da9c488cd4ddc48711f1097,2,7798.0,3899.0,High-Value
ec5b2ba62e574342386871631fafd3fc,4,7160.0,1790.0,High-Value
3118aefef04e5e97d0e339cd75d6d775,7,6909.0,987.0,Medium-Value


In [0]:
full_orders_df = full_orders_df.join(customer_spending_df.select('customer_id','customer_segment'),on='customer_id',how='left')

In [0]:
full_orders_df.select('customer_id', 'customer_segment').display(5)

customer_id,customer_segment
c77ee2d8ba1614a4d489a44166894938,Low-Value
3d3c463710ea6e8dd9a63c1110eeb06b,Low-Value
538a4d02876412846b966a3c057395e5,Low-Value
0a978c825ff7d013133ddc7f77566172,Low-Value
21a99191298d34fb6dd0b088e821591c,Low-Value
6ad71323c11ba8a83737ccc3ea31fbc3,Low-Value
0470c47f1dd7a91d0f3b8a420589e0f7,Low-Value
0f3a81be69f12da7e2979fd1833e923d,Low-Value
72d90899884781ae2fc19e49cc102fc0,Low-Value
a8f76d9cb0f8db57cbbfe8d67b257893,Low-Value


In [0]:
#Hourly Order Distribution

full_orders_df = full_orders_df.withColumn('hour_of_day', expr('hour(order_purchase_timestamp)'))

In [0]:
full_orders_df.select('order_purchase_timestamp','hour_of_day').show(5)

+------------------------+-----------+
|order_purchase_timestamp|hour_of_day|
+------------------------+-----------+
|     2017-11-18 19:28:06|         19|
|     2017-10-02 10:56:33|         10|
|     2018-02-13 21:18:39|         21|
|     2018-08-08 08:38:49|          8|
|     2017-07-09 21:57:05|         21|
+------------------------+-----------+
only showing top 5 rows


In [0]:
# Weekday vs weekend order
full_orders_df = full_orders_df.withColumn('order_day_type',\
    when(dayofweek('order_purchase_timestamp').isin(1,7), lit('Weekend')).otherwise(lit('Weekday')))

In [0]:
full_orders_df.select('order_purchase_timestamp','order_day_type').show()

+------------------------+--------------+
|order_purchase_timestamp|order_day_type|
+------------------------+--------------+
|     2017-05-16 19:41:10|       Weekday|
|     2018-01-02 19:00:43|       Weekday|
|     2017-01-23 18:29:09|       Weekday|
|     2017-11-18 19:28:06|       Weekend|
|     2017-10-02 10:56:33|       Weekday|
|     2018-02-13 21:18:39|       Weekday|
|     2017-07-29 11:55:02|       Weekend|
|     2018-08-08 08:38:49|       Weekday|
|     2017-04-11 12:22:08|       Weekday|
|     2018-03-01 14:14:28|       Weekday|
|     2018-06-07 19:03:12|       Weekday|
|     2017-07-09 21:57:05|       Weekend|
|     2017-11-21 00:03:41|       Weekday|
|     2018-07-25 17:44:10|       Weekday|
|     2017-10-26 15:54:26|       Weekday|
|     2018-06-07 10:06:19|       Weekday|
|     2017-09-18 14:31:30|       Weekday|
|     2018-07-24 20:41:37|       Weekday|
|     2017-05-16 13:10:30|       Weekday|
|     2017-12-26 23:41:31|       Weekday|
+------------------------+--------